From 7c310c281aaa528cd675625e12d6e00308e10809 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Thu, 24 Dec 2020 15:41:25 +0800 Subject: [PATCH 01/13] demo (not work) Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 184 ++++++++++++++---- .../raftstore/src/store/fsm/sync_policy.rs | 19 +- 2 files changed, 162 insertions(+), 41 deletions(-) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 6bfb905d75e..1b86cc1894b 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -49,7 +49,7 @@ use crate::store::fsm::metrics::*; use crate::store::fsm::peer::{ maybe_destroy_source, new_admin_request, PeerFsm, PeerFsmDelegate, SenderFsmPair, }; -use crate::store::fsm::sync_policy::{new_sync_policy, SyncAction, SyncPolicy}; +use crate::store::fsm::sync_policy::{new_sync_policy, SyncAction, SyncPolicy, UnsyncedReady}; use crate::store::fsm::ApplyNotifier; use crate::store::fsm::ApplyTaskRes; use crate::store::fsm::{ @@ -82,6 +82,130 @@ const RAFT_WB_SHRINK_SIZE: usize = 1024 * 1024; pub const PENDING_MSG_CAP: usize = 100; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); +use std::collections::VecDeque; +use std::thread::JoinHandle; + +pub struct AsyncDBWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + engine: ER, + router: RaftRouter, + tag: String, + wbs: Arc>>, + pub tx: LooseBoundedSender<(ER::LogBatch, VecDeque)>, + rx: Arc)>>, + workers: Arc>>>, +} + +impl Clone for AsyncDBWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + fn clone(&self) -> Self { + AsyncDBWriter{ + engine: self.engine.clone(), + router: self.router.clone(), + tag: self.tag.clone(), + wbs: self.wbs.clone(), + tx: self.tx.clone(), + rx: self.rx.clone(), + workers: self.workers.clone(), + } + } +} + +impl AsyncDBWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize) -> AsyncDBWriter { + let (tx, rx) = mpsc::loose_bounded(pool_size * 2); + let mut async_writer = AsyncDBWriter{ + engine, + router, + tag, + wbs: Arc::new(Mutex::new(VecDeque::default())), + tx, + rx: Arc::new(rx), + workers: Arc::new(Mutex::new(vec![])), + }; + async_writer.spawn(pool_size); + async_writer + } + + fn spawn(&mut self, pool_size: usize) { + // TODO: support more than 1 write-thread + assert!(pool_size == 1); + for i in 0..pool_size { + let mut x = self.clone(); + let t = thread::Builder::new() + .name(thd_name!(format!("raftdb-async-writer-{}", i))) + .spawn(move || { + let (wb, unsynced_readies) = x.rx.recv().unwrap(); + x.sync_write(wb, unsynced_readies) + }) + .unwrap(); + // TODO: graceful exit + self.workers.lock().unwrap().push(t); + } + } + + pub fn new_wb(&mut self) -> ER::LogBatch { + let mut wbs = self.wbs.lock().unwrap(); + if wbs.is_empty() { + self.engine.log_batch(4 * 1024) + } else { + wbs.pop_front().unwrap() + } + } + + pub fn async_write(&mut self, wb: ER::LogBatch, unsynced_readies: VecDeque) { + // TODO: block if full + self.tx.force_send((wb, unsynced_readies)) + .unwrap_or_else(|e| { + panic!("{} failed to send task via channel: {:?}", self.tag, e); + }); + } + + pub fn sync_write(&mut self, mut wb: ER::LogBatch, unsynced_readies: VecDeque) { + self.engine + .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) + .unwrap_or_else(|e| { + panic!("{} failed to save raft append result: {:?}", self.tag, e); + }); + self.flush_unsynced_readies(unsynced_readies); + let mut wbs = self.wbs.lock().unwrap(); + wbs.push_back(wb); + } + + fn flush_unsynced_readies(&mut self, mut unsynced_readies: VecDeque) { + for r in unsynced_readies.drain(..) { + loop { + let pre_number = r.notifier.load(Ordering::Acquire); + assert_ne!(pre_number, r.number); + if pre_number > r.number { + break; + } + if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) { + if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { + error!( + "failed to send noop to trigger persisted ready"; + "region_id" => r.region_id, + "ready_number" => r.number, + "error" => ?e, + ); + } + break; + } + } + } + } +} + pub struct StoreInfo { pub engine: E, pub capacity: u64, @@ -339,6 +463,7 @@ where pub tick_batch: Vec, pub node_start_time: Option, pub sync_policy: SyncPolicy>, + pub async_writer: Arc>>, } impl HandleRaftReadyContext for PollContext @@ -376,6 +501,14 @@ where EK: KvEngine, ER: RaftEngine, { + #[inline] + pub fn detach_raft_wb(&mut self) -> ER::LogBatch { + let mut async_writer = self.async_writer.lock().unwrap(); + let mut raft_wb = async_writer.new_wb(); + mem::swap(&mut self.raft_wb, &mut raft_wb); + raft_wb + } + #[inline] pub fn store_id(&self) -> u64 { self.store.get_id() @@ -664,31 +797,8 @@ impl RaftPoller { self.poll_ctx.store_id() == 1, |_| {} ); - self.poll_ctx - .engines - .raft - .consume_and_shrink( - &mut self.poll_ctx.raft_wb, - false, - RAFT_WB_SHRINK_SIZE, - 4 * 1024, - ) - .unwrap_or_else(|e| { - panic!("{} failed to save raft append result: {:?}", self.tag, e); - }); } - let synced = if self.poll_ctx.sync_policy.delay_sync_enabled() { - self.poll_ctx.sync_policy.sync_if_needed(true) - } else { - if !raft_wb_is_empty { - self.poll_ctx.engines.raft.sync().unwrap_or_else(|e| { - panic!("{} failed to sync raft engine: {:?}", self.tag, e); - }); - } - true - }; - report_perf_context!( self.poll_ctx.perf_context_statistics, STORE_PERF_CONTEXT_TIME_HISTOGRAM_STATIC @@ -696,11 +806,7 @@ impl RaftPoller { fail_point!("raft_after_save"); if ready_cnt != 0 { - let unsynced_version = if synced { - None - } else { - Some(self.poll_ctx.sync_policy.new_unsynced_version()) - }; + let unsynced_version = Some(self.poll_ctx.sync_policy.new_unsynced_version()); let mut batch_pos = 0; let mut ready_res = mem::take(&mut self.poll_ctx.ready_res); for (ready, invoke_ctx) in ready_res.drain(..) { @@ -715,6 +821,14 @@ impl RaftPoller { .post_raft_ready_append(ready, invoke_ctx, unsynced_version); } } + + if !raft_wb_is_empty { + let raft_wb = self.poll_ctx.detach_raft_wb(); + let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); + let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); + async_writer.async_write(raft_wb, unsynced_readies); + } + let dur = self.timer.elapsed(); if !self.poll_ctx.store_stat.is_busy { let election_timeout = Duration::from_millis( @@ -797,7 +911,6 @@ impl PollHandler, St self.poll_ctx.cfg = incoming.clone(); self.poll_ctx.update_ticks_timeout(); } - self.poll_ctx.sync_policy.try_flush_readies(); } fn handle_control(&mut self, store: &mut StoreFsm) -> Option { @@ -866,7 +979,6 @@ impl PollHandler, St fn end(&mut self, peers: &mut [Box>]) { self.flush_ticks(); - self.poll_ctx.sync_policy.try_flush_readies(); if self.poll_ctx.has_ready { self.handle_raft_ready(peers); } @@ -876,18 +988,17 @@ impl PollHandler, St .process_ready .observe(duration_to_sec(self.timer.elapsed()) as f64); self.poll_ctx.raft_metrics.flush(); - self.poll_ctx.sync_policy.metrics.flush(); self.poll_ctx.store_stat.flush(); } fn pause(&mut self) -> bool { - let all_synced_and_flushed = self.poll_ctx.sync_policy.try_sync_and_flush(); if self.poll_ctx.trans.need_flush() { self.poll_ctx.trans.flush(); } // If there are cached data and go into pause status, that will cause high latency or hunger // so it should return false(means pause failed) when there are still jobs to do - all_synced_and_flushed + //all_synced_and_flushed + true } } @@ -913,6 +1024,7 @@ pub struct RaftPollerBuilder { applying_snap_count: Arc, global_replication_state: Arc>, pub sync_policy: SyncPolicy>, + pub async_writer: Arc>>, } impl RaftPollerBuilder { @@ -1123,6 +1235,7 @@ where tick_batch: vec![PeerTickBatch::default(); 256], node_start_time: Some(TiInstant::now_coarse()), sync_policy: self.sync_policy.clone(), + async_writer: self.async_writer.clone(), }; ctx.update_ticks_timeout(); let tag = format!("[store {}]", ctx.store.get_id()); @@ -1242,6 +1355,8 @@ impl RaftBatchSystem { cfg.value().delay_sync_enabled(), cfg.value().delay_sync_us as i64, ); + let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), + self.router.clone(), "raftstore-async-writer".to_string(), 1))); let mut builder = RaftPollerBuilder { cfg, store: meta, @@ -1264,6 +1379,7 @@ impl RaftBatchSystem { pending_create_peers: Arc::new(Mutex::new(HashMap::default())), applying_snap_count: Arc::new(AtomicUsize::new(0)), sync_policy, + async_writer, }; let region_peers = builder.init()?; let engine = builder.engines.kv.clone(); diff --git a/components/raftstore/src/store/fsm/sync_policy.rs b/components/raftstore/src/store/fsm/sync_policy.rs index 6d43acfc2f7..40154df1fea 100644 --- a/components/raftstore/src/store/fsm/sync_policy.rs +++ b/components/raftstore/src/store/fsm/sync_policy.rs @@ -4,6 +4,7 @@ use std::collections::VecDeque; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::Arc; +use std::mem; use crossbeam::utils::CachePadded; use engine_traits::KvEngine; @@ -65,10 +66,10 @@ impl Action for SyncAction { #[derive(Default)] pub struct UnsyncedReady { - number: u64, - region_id: u64, - notifier: Arc, - version: u64, + pub number: u64, + pub region_id: u64, + pub notifier: Arc, + pub version: u64, } impl UnsyncedReady { @@ -228,13 +229,17 @@ impl SyncPolicy { notifier: Arc, version: u64, ) { - if !self.delay_sync_enabled { - return; - } + //if !self.delay_sync_enabled { + // return; + //} self.unsynced_readies .push_back(UnsyncedReady::new(number, region_id, notifier, version)); } + pub fn detach_unsynced_readies(&mut self) -> VecDeque { + mem::take(&mut self.unsynced_readies) + } + /// Update the global timestamps(last_sync_ts, last_plan_ts). fn update_ts_after_synced(&mut self, before_sync_ts: i64) { let last_sync_ts = self.global_last_sync_ts.load(Ordering::Acquire); From 1f62d2555c431f4d8e1a0bc1a639bbdfaeb834ae Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Thu, 24 Dec 2020 16:30:43 +0800 Subject: [PATCH 02/13] Splitted store CPU-IO workload so it could use diff conc, but task size still related Signed-off-by: Liu Cong --- components/raftstore/src/store/config.rs | 6 ++++++ components/raftstore/src/store/fsm/store.rs | 14 +++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index e0abd7d6de0..c7a19e5e652 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -30,6 +30,8 @@ pub struct Config { // delay time of raft db sync (us). #[config(skip)] pub delay_sync_us: u64, + #[config(skip)] + pub store_io_pool_size: u64, // minimizes disruption when a partitioned node rejoins the cluster by using a two phase election. #[config(skip)] pub prevote: bool, @@ -196,6 +198,7 @@ impl Default for Config { let split_size = ReadableSize::mb(coprocessor::config::SPLIT_SIZE_MB); Config { delay_sync_us: 0, + store_io_pool_size: 2, prevote: true, raftdb_path: String::new(), capacity: ReadableSize(0), @@ -421,6 +424,9 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["delay_sync_us"]) .set((self.delay_sync_us as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_pool_size"]) + .set((self.store_io_pool_size as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["prevote"]) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 1b86cc1894b..02cbfaa4ee8 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -95,7 +95,7 @@ where tag: String, wbs: Arc>>, pub tx: LooseBoundedSender<(ER::LogBatch, VecDeque)>, - rx: Arc)>>, + rx: Arc)>>>, workers: Arc>>>, } @@ -130,7 +130,7 @@ where tag, wbs: Arc::new(Mutex::new(VecDeque::default())), tx, - rx: Arc::new(rx), + rx: Arc::new(Mutex::new(rx)), workers: Arc::new(Mutex::new(vec![])), }; async_writer.spawn(pool_size); @@ -138,15 +138,15 @@ where } fn spawn(&mut self, pool_size: usize) { - // TODO: support more than 1 write-thread - assert!(pool_size == 1); for i in 0..pool_size { let mut x = self.clone(); let t = thread::Builder::new() .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { - let (wb, unsynced_readies) = x.rx.recv().unwrap(); - x.sync_write(wb, unsynced_readies) + loop { + let (wb, unsynced_readies) = x.rx.lock().unwrap().recv().unwrap(); + x.sync_write(wb, unsynced_readies); + } }) .unwrap(); // TODO: graceful exit @@ -1356,7 +1356,7 @@ impl RaftBatchSystem { cfg.value().delay_sync_us as i64, ); let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), - self.router.clone(), "raftstore-async-writer".to_string(), 1))); + self.router.clone(), "raftstore-async-writer".to_string(), cfg.value().store_io_pool_size as usize))); let mut builder = RaftPollerBuilder { cfg, store: meta, From f0a597ba43f94ad4215a0b1da7ed32e027f65306 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Fri, 25 Dec 2020 15:57:54 +0800 Subject: [PATCH 03/13] Splitted store CPU-IO workload so it could use diff conc, but task size still related Signed-off-by: Liu Cong --- Cargo.lock | 123 +++++++++++++++----- components/raftstore/Cargo.toml | 1 + components/raftstore/src/store/fsm/store.rs | 24 ++-- 3 files changed, 105 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 32eb0fb10a1..4573bfca278 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,7 +215,7 @@ dependencies = [ "prometheus", "raft", "raftstore", - "rand", + "rand 0.7.3", "security", "serde", "serde_derive", @@ -497,6 +497,15 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "chan" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d14956a3dae065ffaa0d92ece848ab4ced88d32361e7fdfbfd653a5c454a1ed8" +dependencies = [ + "rand 0.3.23", +] + [[package]] name = "chrono" version = "0.4.11" @@ -604,7 +613,7 @@ dependencies = [ "raft", "raft_log_engine", "raftstore", - "rand", + "rand 0.7.3", "security", "serde_json", "signal", @@ -633,7 +642,7 @@ dependencies = [ "libc", "panic_hook", "protobuf", - "rand", + "rand 0.7.3", "static_assertions", "tikv_alloc", ] @@ -647,7 +656,7 @@ dependencies = [ "fail", "futures 0.3.7", "parking_lot 0.11.0", - "rand", + "rand 0.7.3", "tikv_alloc", "tokio", "txn_types", @@ -689,7 +698,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c750ec12b83377637110d5a57f5ae08e895b06c4b16e2bdbf1a94ef717428c59" dependencies = [ "proc-macro-hack", - "rand", + "rand 0.7.3", ] [[package]] @@ -755,7 +764,7 @@ dependencies = [ "itertools 0.8.2", "lazy_static", "num-traits", - "rand_core", + "rand_core 0.5.1", "rand_os", "rand_xoshiro", "rayon", @@ -1088,7 +1097,7 @@ dependencies = [ "openssl", "prometheus", "protobuf", - "rand", + "rand 0.7.3", "rusoto_core", "rusoto_credential", "rusoto_kms", @@ -1137,7 +1146,7 @@ dependencies = [ "prometheus-static-metric", "protobuf", "raft", - "rand", + "rand 0.7.3", "rocksdb", "serde", "serde_derive", @@ -1232,7 +1241,7 @@ dependencies = [ "lazy_static", "matches", "prometheus", - "rand", + "rand 0.7.3", "rusoto_core", "rusoto_mock", "rusoto_s3", @@ -1257,7 +1266,7 @@ checksum = "3be3c61c59fdc91f5dbc3ea31ee8623122ce80057058be560654c5d410d181a6" dependencies = [ "lazy_static", "log", - "rand", + "rand 0.7.3", ] [[package]] @@ -1296,7 +1305,7 @@ dependencies = [ "fs2", "lazy_static", "openssl", - "rand", + "rand 0.7.3", "tempfile", "tikv_alloc", ] @@ -1344,6 +1353,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f2a4a2034423744d2cc7ca2068453168dcdb82c438419e639a26bd87839c674" +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "fuchsia-zircon" version = "0.3.3" @@ -3092,7 +3107,7 @@ dependencies = [ "protobuf", "quick-error", "raft-proto", - "rand", + "rand 0.7.3", "slog", ] @@ -3152,6 +3167,7 @@ dependencies = [ "batch-system", "bitflags", "byteorder", + "chan", "concurrency_manager", "configuration", "crc32fast", @@ -3184,7 +3200,7 @@ dependencies = [ "quick-error", "raft", "raft-proto", - "rand", + "rand 0.7.3", "serde", "serde_derive", "serde_with", @@ -3204,6 +3220,29 @@ dependencies = [ "yatp", ] +[[package]] +name = "rand" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" +dependencies = [ + "libc", + "rand 0.4.6", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi 0.3.8", +] + [[package]] name = "rand" version = "0.7.3" @@ -3213,7 +3252,7 @@ dependencies = [ "getrandom", "libc", "rand_chacha", - "rand_core", + "rand_core 0.5.1", "rand_hc", ] @@ -3224,9 +3263,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a2a90da8c7523f554344f921aa97283eadf6ac484a6d2a7d0212fa7f8d6853" dependencies = [ "c2-chacha", - "rand_core", + "rand_core 0.5.1", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.5.1" @@ -3242,7 +3296,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3251,7 +3305,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df6b0b3dc9991a10b2d91a86d1129314502169a1bf6afa67328945e02498b76" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3261,7 +3315,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a788ae3edb696cfcba1c19bfd388cc4b8c21f8a408432b199c072825084da58a" dependencies = [ "getrandom", - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3270,7 +3324,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77d416b86801d23dde1aa643023b775c3a462efc0ed96443add11546cdf1dca8" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3279,7 +3333,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e18c91676f670f6f0312764c759405f13afb98d5d73819840cf72a518487bff" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3317,6 +3371,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.1.56" @@ -4245,7 +4308,7 @@ checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" dependencies = [ "cfg-if", "libc", - "rand", + "rand 0.7.3", "redox_syscall", "remove_dir_all", "winapi 0.3.8", @@ -4333,7 +4396,7 @@ dependencies = [ "pd_client", "raft", "raftstore", - "rand", + "rand 0.7.3", "security", "slog", "slog-global", @@ -4380,7 +4443,7 @@ dependencies = [ "fail", "grpcio", "kvproto", - "rand", + "rand 0.7.3", "rand_isaac", "security", "slog", @@ -4423,7 +4486,7 @@ dependencies = [ "protobuf", "raft", "raftstore", - "rand", + "rand 0.7.3", "rand_xorshift", "security", "semver 0.10.0", @@ -4614,7 +4677,7 @@ dependencies = [ "panic_hook", "profiler", "protobuf", - "rand", + "rand 0.7.3", "safemem", "static_assertions", "tidb_query_codegen", @@ -4697,7 +4760,7 @@ dependencies = [ "raft", "raft_log_engine", "raftstore", - "rand", + "rand 0.7.3", "regex", "rev_lines", "security", @@ -4817,7 +4880,7 @@ dependencies = [ "prometheus", "protobuf", "quick-error", - "rand", + "rand 0.7.3", "regex", "serde", "serde_json", @@ -5159,7 +5222,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" dependencies = [ - "rand", + "rand 0.7.3", "serde", ] @@ -5415,7 +5478,7 @@ dependencies = [ "num_cpus", "parking_lot_core 0.8.0", "prometheus", - "rand", + "rand 0.7.3", ] [[package]] @@ -5430,7 +5493,7 @@ version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e12b8667a4fff63d236f8363be54392f93dbb13616be64a83e761a9319ab589" dependencies = [ - "rand", + "rand 0.7.3", ] [[package]] diff --git a/components/raftstore/Cargo.toml b/components/raftstore/Cargo.toml index abd9ed62b6d..052729a03f4 100644 --- a/components/raftstore/Cargo.toml +++ b/components/raftstore/Cargo.toml @@ -83,6 +83,7 @@ tokio = { version = "0.2", features = ["sync", "rt-threaded"] } txn_types = { path = "../txn_types" } uuid = { version = "0.8.1", features = ["serde", "v4"] } yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" } +chan = "0.1" [dev-dependencies] engine_test = { path = "../engine_test" } diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 02cbfaa4ee8..f91e14acd0c 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -84,6 +84,7 @@ const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); use std::collections::VecDeque; use std::thread::JoinHandle; +use chan; pub struct AsyncDBWriter where @@ -94,8 +95,8 @@ where router: RaftRouter, tag: String, wbs: Arc>>, - pub tx: LooseBoundedSender<(ER::LogBatch, VecDeque)>, - rx: Arc)>>>, + pub tx: chan::Sender<(ER::LogBatch, VecDeque)>, + rx: Arc)>>, workers: Arc>>>, } @@ -122,15 +123,15 @@ where EK: KvEngine, ER: RaftEngine, { - pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize) -> AsyncDBWriter { - let (tx, rx) = mpsc::loose_bounded(pool_size * 2); + pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize, buff_size: usize) -> AsyncDBWriter { + let (tx, rx) = chan::sync(buff_size); let mut async_writer = AsyncDBWriter{ engine, router, tag, wbs: Arc::new(Mutex::new(VecDeque::default())), tx, - rx: Arc::new(Mutex::new(rx)), + rx: Arc::new(rx), workers: Arc::new(Mutex::new(vec![])), }; async_writer.spawn(pool_size); @@ -144,7 +145,7 @@ where .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { loop { - let (wb, unsynced_readies) = x.rx.lock().unwrap().recv().unwrap(); + let (wb, unsynced_readies) = x.rx.recv().unwrap(); x.sync_write(wb, unsynced_readies); } }) @@ -164,11 +165,7 @@ where } pub fn async_write(&mut self, wb: ER::LogBatch, unsynced_readies: VecDeque) { - // TODO: block if full - self.tx.force_send((wb, unsynced_readies)) - .unwrap_or_else(|e| { - panic!("{} failed to send task via channel: {:?}", self.tag, e); - }); + self.tx.send((wb, unsynced_readies)); } pub fn sync_write(&mut self, mut wb: ER::LogBatch, unsynced_readies: VecDeque) { @@ -826,7 +823,7 @@ impl RaftPoller { let raft_wb = self.poll_ctx.detach_raft_wb(); let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); - async_writer.async_write(raft_wb, unsynced_readies); + async_writer.sync_write(raft_wb, unsynced_readies); } let dur = self.timer.elapsed(); @@ -1356,7 +1353,8 @@ impl RaftBatchSystem { cfg.value().delay_sync_us as i64, ); let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), - self.router.clone(), "raftstore-async-writer".to_string(), cfg.value().store_io_pool_size as usize))); + self.router.clone(), "raftstore-async-writer".to_string(), + cfg.value().store_io_pool_size as usize, 16))); let mut builder = RaftPollerBuilder { cfg, store: meta, From c72af06e7466a899ae03aeb2ef357dcf927035c1 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Sun, 27 Dec 2020 19:03:13 +0800 Subject: [PATCH 04/13] Fixed issue: can not transfer leader Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index f91e14acd0c..4f447ec454f 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -803,7 +803,11 @@ impl RaftPoller { fail_point!("raft_after_save"); if ready_cnt != 0 { - let unsynced_version = Some(self.poll_ctx.sync_policy.new_unsynced_version()); + let unsynced_version = if !raft_wb_is_empty { + Some(self.poll_ctx.sync_policy.new_unsynced_version()) + } else { + None + }; let mut batch_pos = 0; let mut ready_res = mem::take(&mut self.poll_ctx.ready_res); for (ready, invoke_ctx) in ready_res.drain(..) { @@ -823,7 +827,7 @@ impl RaftPoller { let raft_wb = self.poll_ctx.detach_raft_wb(); let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); - async_writer.sync_write(raft_wb, unsynced_readies); + async_writer.async_write(raft_wb, unsynced_readies); } let dur = self.timer.elapsed(); From d6ff6e4e16616956afaa99d325445153814352d6 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Sun, 27 Dec 2020 20:54:43 +0800 Subject: [PATCH 05/13] Fixed issue: can not transfer leader Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 4f447ec454f..c8fc211a0df 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -803,11 +803,7 @@ impl RaftPoller { fail_point!("raft_after_save"); if ready_cnt != 0 { - let unsynced_version = if !raft_wb_is_empty { - Some(self.poll_ctx.sync_policy.new_unsynced_version()) - } else { - None - }; + let unsynced_version = Some(self.poll_ctx.sync_policy.new_unsynced_version()); let mut batch_pos = 0; let mut ready_res = mem::take(&mut self.poll_ctx.ready_res); for (ready, invoke_ctx) in ready_res.drain(..) { @@ -823,11 +819,14 @@ impl RaftPoller { } } + let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); if !raft_wb_is_empty { let raft_wb = self.poll_ctx.detach_raft_wb(); - let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); async_writer.async_write(raft_wb, unsynced_readies); + } else { + let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); + async_writer.flush_unsynced_readies(unsynced_readies); } let dur = self.timer.elapsed(); From 373913b8a44025493f39f31f68a284b41ced1df4 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Mon, 28 Dec 2020 17:43:49 +0800 Subject: [PATCH 06/13] Add metrics Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 22 ++++++++++++ components/raftstore/src/store/metrics.rs | 37 +++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index c8fc211a0df..54d0a5d5050 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -84,6 +84,7 @@ const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); use std::collections::VecDeque; use std::thread::JoinHandle; +use prometheus::IntGauge; use chan; pub struct AsyncDBWriter @@ -98,6 +99,7 @@ where pub tx: chan::Sender<(ER::LogBatch, VecDeque)>, rx: Arc)>>, workers: Arc>>>, + metrics_queue_size: IntGauge, } impl Clone for AsyncDBWriter @@ -114,6 +116,7 @@ where tx: self.tx.clone(), rx: self.rx.clone(), workers: self.workers.clone(), + metrics_queue_size: self.metrics_queue_size.clone(), } } } @@ -133,6 +136,8 @@ where tx, rx: Arc::new(rx), workers: Arc::new(Mutex::new(vec![])), + metrics_queue_size: ASYNC_WRITER_IO_QUEUE_VEC + .with_label_values(&["raft-log"]), }; async_writer.spawn(pool_size); async_writer @@ -145,8 +150,11 @@ where .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { loop { + let now = TiInstant::now_coarse(); let (wb, unsynced_readies) = x.rx.recv().unwrap(); x.sync_write(wb, unsynced_readies); + STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM + .observe(duration_to_sec(now.elapsed()) as f64); } }) .unwrap(); @@ -165,15 +173,19 @@ where } pub fn async_write(&mut self, wb: ER::LogBatch, unsynced_readies: VecDeque) { + self.metrics_queue_size.inc(); self.tx.send((wb, unsynced_readies)); } pub fn sync_write(&mut self, mut wb: ER::LogBatch, unsynced_readies: VecDeque) { + let now = TiInstant::now_coarse(); self.engine .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) .unwrap_or_else(|e| { panic!("{} failed to save raft append result: {:?}", self.tag, e); }); + STORE_WRITE_RAFTDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64); + self.metrics_queue_size.dec(); self.flush_unsynced_readies(unsynced_readies); let mut wbs = self.wbs.lock().unwrap(); wbs.push_back(wb); @@ -750,6 +762,7 @@ pub struct RaftPoller>, previous_metrics: RaftMetrics, timer: TiInstant, + loop_timer: TiInstant, poll_ctx: PollContext, messages_per_tick: usize, cfg_tracker: Tracker, @@ -768,6 +781,7 @@ impl RaftPoller { let ready_cnt = self.poll_ctx.ready_res.len(); self.poll_ctx.raft_metrics.ready.has_ready_region += ready_cnt as u64; fail_point!("raft_before_save"); + let now = TiInstant::now_coarse(); if !self.poll_ctx.kv_wb.is_empty() { let mut write_opts = WriteOptions::new(); write_opts.set_sync(true); @@ -786,6 +800,8 @@ impl RaftPoller { } } fail_point!("raft_between_save"); + STORE_WRITE_KVDB_DURATION_HISTOGRAM + .observe(duration_to_sec(now.elapsed()) as f64); let raft_wb_is_empty = self.poll_ctx.raft_wb.is_empty(); if !raft_wb_is_empty { @@ -889,6 +905,9 @@ impl PollHandler, St self.poll_ctx.sync_log = false; self.poll_ctx.has_ready = false; self.timer = TiInstant::now_coarse(); + STORE_LOOP_DURATION_HISTOGRAM + .observe(duration_to_sec(self.loop_timer.elapsed()) as f64); + self.loop_timer = TiInstant::now_coarse(); // update config self.poll_ctx.perf_context_statistics.start(); if let Some(incoming) = self.cfg_tracker.any_new() { @@ -987,6 +1006,8 @@ impl PollHandler, St .raft_metrics .process_ready .observe(duration_to_sec(self.timer.elapsed()) as f64); + STORE_LOOP_WORK_DURATION_HISTOGRAM + .observe(duration_to_sec(self.loop_timer.elapsed()) as f64); self.poll_ctx.raft_metrics.flush(); self.poll_ctx.store_stat.flush(); } @@ -1245,6 +1266,7 @@ where peer_msg_buf: Vec::with_capacity(ctx.cfg.messages_per_tick), previous_metrics: ctx.raft_metrics.clone(), timer: TiInstant::now_coarse(), + loop_timer: TiInstant::now_coarse(), messages_per_tick: ctx.cfg.messages_per_tick, poll_ctx: ctx, cfg_tracker: self.cfg.clone().tracker(tag), diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 2b0ac6901c3..e1ab55cf9e4 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -180,6 +180,43 @@ make_auto_flush_static_metric! { } lazy_static! { + pub static ref ASYNC_WRITER_IO_QUEUE_VEC: IntGaugeVec = + register_int_gauge_vec!( + "tikv_raftstore_async_writer_io_queue_total", + "Current pending + running io tasks.", + &["name"] + ).unwrap(); + pub static ref STORE_LOOP_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_loop_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_LOOP_WORK_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_loop_work_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_KVDB_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_kvdb_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_raftdb_tick_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_RAFTDB_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_raftdb_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( "tikv_raftstore_proposal_total", From 8571bff48bddb21a45401fab5df3cd5a2a7fe636 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Tue, 29 Dec 2020 11:52:38 +0800 Subject: [PATCH 07/13] Add io config entries Signed-off-by: Liu Cong --- components/raftstore/src/store/config.rs | 12 ++++++++++++ components/raftstore/src/store/fsm/apply.rs | 5 ++++- components/raftstore/src/store/fsm/store.rs | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index c7a19e5e652..a8e1c200899 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -32,6 +32,10 @@ pub struct Config { pub delay_sync_us: u64, #[config(skip)] pub store_io_pool_size: u64, + #[config(skip)] + pub store_io_queue: u64, + #[config(skip)] + pub apply_io_size: u64, // minimizes disruption when a partitioned node rejoins the cluster by using a two phase election. #[config(skip)] pub prevote: bool, @@ -199,6 +203,8 @@ impl Default for Config { Config { delay_sync_us: 0, store_io_pool_size: 2, + store_io_queue: 1, + apply_io_size: 1024 * 32, prevote: true, raftdb_path: String::new(), capacity: ReadableSize(0), @@ -427,6 +433,12 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_pool_size"]) .set((self.store_io_pool_size as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue"]) + .set((self.store_io_queue as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["apply_io_size"]) + .set((self.apply_io_size as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["prevote"]) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 644ad98b0a9..e9d926a7630 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -353,6 +353,8 @@ where yield_duration: Duration, + apply_io_size: usize, + store_id: u64, /// region_id -> (peer_id, is_splitting) /// Used for handling race between splitting and creating new peer. @@ -402,6 +404,7 @@ where use_delete_range: cfg.use_delete_range, perf_context_statistics: PerfContextStatistics::new(cfg.perf_level), yield_duration: cfg.apply_yield_duration.0, + apply_io_size: cfg.apply_io_size as usize, store_id, pending_create_peers, } @@ -922,7 +925,7 @@ where if !data.is_empty() { let cmd = util::parse_data_at(data, index, &self.tag); - if should_write_to_engine(&cmd) || apply_ctx.kv_wb().should_write_to_engine() { + if should_write_to_engine(&cmd) || apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size { apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.elapsed() >= apply_ctx.yield_duration { diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 54d0a5d5050..41feef40795 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1379,7 +1379,7 @@ impl RaftBatchSystem { ); let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), self.router.clone(), "raftstore-async-writer".to_string(), - cfg.value().store_io_pool_size as usize, 16))); + cfg.value().store_io_pool_size as usize, cfg.value().store_io_queue as usize))); let mut builder = RaftPollerBuilder { cfg, store: meta, From 1e3bc7065bb87b549b18071e9a9d3f8c80d0c4f4 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Mon, 4 Jan 2021 17:37:15 +0800 Subject: [PATCH 08/13] Merge region IO Signed-off-by: Liu Cong --- components/raftstore/src/store/config.rs | 2 +- components/raftstore/src/store/fsm/store.rs | 190 ++++++++++-------- components/raftstore/src/store/metrics.rs | 6 + components/raftstore/src/store/peer.rs | 3 +- .../raftstore/src/store/peer_storage.rs | 71 +++++-- 5 files changed, 170 insertions(+), 102 deletions(-) diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index a8e1c200899..ed91d223406 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -204,7 +204,7 @@ impl Default for Config { delay_sync_us: 0, store_io_pool_size: 2, store_io_queue: 1, - apply_io_size: 1024 * 32, + apply_io_size: 0, prevote: true, raftdb_path: String::new(), capacity: ReadableSize(0), diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 41feef40795..bf0c0d1af7e 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -82,12 +82,11 @@ const RAFT_WB_SHRINK_SIZE: usize = 1024 * 1024; pub const PENDING_MSG_CAP: usize = 100; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); +use crate::store::peer_storage::AsyncWriterTask; use std::collections::VecDeque; use std::thread::JoinHandle; -use prometheus::IntGauge; -use chan; -pub struct AsyncDBWriter +pub struct AsyncWriter where EK: KvEngine, ER: RaftEngine, @@ -95,49 +94,50 @@ where engine: ER, router: RaftRouter, tag: String, - wbs: Arc>>, - pub tx: chan::Sender<(ER::LogBatch, VecDeque)>, - rx: Arc)>>, + buff_size: usize, + tasks: Arc>>>, workers: Arc>>>, - metrics_queue_size: IntGauge, } -impl Clone for AsyncDBWriter +impl Clone for AsyncWriter where EK: KvEngine, ER: RaftEngine, { fn clone(&self) -> Self { - AsyncDBWriter{ + AsyncWriter{ engine: self.engine.clone(), router: self.router.clone(), tag: self.tag.clone(), - wbs: self.wbs.clone(), - tx: self.tx.clone(), - rx: self.rx.clone(), + buff_size: self.buff_size, + tasks: self.tasks.clone(), workers: self.workers.clone(), - metrics_queue_size: self.metrics_queue_size.clone(), } } } -impl AsyncDBWriter +impl AsyncWriter where EK: KvEngine, ER: RaftEngine, { - pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize, buff_size: usize) -> AsyncDBWriter { - let (tx, rx) = chan::sync(buff_size); - let mut async_writer = AsyncDBWriter{ + pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize, buff_size: usize) -> AsyncWriter { + let mut tasks = VecDeque::default(); + for _ in 0..(pool_size + 1) { + tasks.push_back( + AsyncWriterTask { + wb: engine.log_batch(4 * 1024), + unsynced_readies: HashMap::default(), + } + ); + } + let mut async_writer = AsyncWriter{ engine, router, tag, - wbs: Arc::new(Mutex::new(VecDeque::default())), - tx, - rx: Arc::new(rx), + buff_size, + tasks: Arc::new(Mutex::new(tasks)), workers: Arc::new(Mutex::new(vec![])), - metrics_queue_size: ASYNC_WRITER_IO_QUEUE_VEC - .with_label_values(&["raft-log"]), }; async_writer.spawn(pool_size); async_writer @@ -149,10 +149,32 @@ where let t = thread::Builder::new() .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { + let mut sleep = false; loop { + // TODO: block if no data in current raft_wb + if sleep { + let d = Duration::from_millis(1); + thread::sleep(d); + } + sleep = false; let now = TiInstant::now_coarse(); - let (wb, unsynced_readies) = x.rx.recv().unwrap(); - x.sync_write(wb, unsynced_readies); + // TODO: block if too many data in current raft_wb + let task = { + let mut tasks = x.tasks.lock().unwrap(); + if tasks.front().unwrap().unsynced_readies.is_empty() { + sleep = true; + continue; + } + tasks.pop_front().unwrap() + }; + + let wb = x.sync_write(task.wb, task.unsynced_readies); + + { + let mut tasks = x.tasks.lock().unwrap(); + tasks.push_back(AsyncWriterTask{wb, unsynced_readies: HashMap::default()}); + } + STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM .observe(duration_to_sec(now.elapsed()) as f64); } @@ -163,21 +185,12 @@ where } } - pub fn new_wb(&mut self) -> ER::LogBatch { - let mut wbs = self.wbs.lock().unwrap(); - if wbs.is_empty() { - self.engine.log_batch(4 * 1024) - } else { - wbs.pop_front().unwrap() - } - } - - pub fn async_write(&mut self, wb: ER::LogBatch, unsynced_readies: VecDeque) { - self.metrics_queue_size.inc(); - self.tx.send((wb, unsynced_readies)); + pub fn raft_wb_pool(&mut self) -> Arc>>> { + self.tasks.clone() } - pub fn sync_write(&mut self, mut wb: ER::LogBatch, unsynced_readies: VecDeque) { + // TODO: this func is assumed in tasks.locked status + pub fn sync_write(&mut self, mut wb: ER::LogBatch, mut unsynced_readies: HashMap) -> ER::LogBatch { let now = TiInstant::now_coarse(); self.engine .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) @@ -185,31 +198,41 @@ where panic!("{} failed to save raft append result: {:?}", self.tag, e); }); STORE_WRITE_RAFTDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64); - self.metrics_queue_size.dec(); - self.flush_unsynced_readies(unsynced_readies); - let mut wbs = self.wbs.lock().unwrap(); - wbs.push_back(wb); + self.flush_unsynced_readies(&unsynced_readies); + unsynced_readies.clear(); + wb } - fn flush_unsynced_readies(&mut self, mut unsynced_readies: VecDeque) { + fn flush_unsynced_readies(&mut self, unsynced_readies: &HashMap) { + for (_, r) in unsynced_readies { + self.flush_unsynced_ready(r); + } + } + + fn drain_flush_unsynced_readies(&mut self, mut unsynced_readies: VecDeque) { for r in unsynced_readies.drain(..) { - loop { - let pre_number = r.notifier.load(Ordering::Acquire); - assert_ne!(pre_number, r.number); - if pre_number > r.number { - break; - } - if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) { - if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { - error!( - "failed to send noop to trigger persisted ready"; - "region_id" => r.region_id, - "ready_number" => r.number, - "error" => ?e, - ); - } - break; + self.flush_unsynced_ready(&r); + } + } + + fn flush_unsynced_ready(&mut self, r: &UnsyncedReady) { + loop { + let pre_number = r.notifier.load(Ordering::Acquire); + // TODO: reduce duplicated messages + //assert_ne!(pre_number, r.number); + if pre_number >= r.number { + break; + } + if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) { + if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { + error!( + "failed to send noop to trigger persisted ready"; + "region_id" => r.region_id, + "ready_number" => r.number, + "error" => ?e, + ); } + break; } } } @@ -462,7 +485,8 @@ where pub store_stat: LocalStoreStat, pub engines: Engines, pub kv_wb: EK::WriteBatch, - pub raft_wb: ER::LogBatch, + //pub raft_wb: ER::LogBatch, + pub raft_wb_is_empty: bool, pub pending_count: usize, pub sync_log: bool, pub has_ready: bool, @@ -472,7 +496,7 @@ where pub tick_batch: Vec, pub node_start_time: Option, pub sync_policy: SyncPolicy>, - pub async_writer: Arc>>, + pub async_writer: Arc>>, } impl HandleRaftReadyContext for PollContext @@ -480,8 +504,10 @@ where EK: KvEngine, ER: RaftEngine, { - fn wb_mut(&mut self) -> (&mut EK::WriteBatch, &mut ER::LogBatch) { - (&mut self.kv_wb, &mut self.raft_wb) + fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc::>>>) { + self.raft_wb_is_empty = false; + let mut async_writer = self.async_writer.lock().unwrap(); + (&mut self.kv_wb, async_writer.raft_wb_pool()) } #[inline] @@ -490,8 +516,10 @@ where } #[inline] - fn raft_wb_mut(&mut self) -> &mut ER::LogBatch { - &mut self.raft_wb + fn raft_wb_pool(&mut self) -> Arc>>> { + self.raft_wb_is_empty = false; + let mut async_writer = self.async_writer.lock().unwrap(); + async_writer.raft_wb_pool() } #[inline] @@ -510,14 +538,6 @@ where EK: KvEngine, ER: RaftEngine, { - #[inline] - pub fn detach_raft_wb(&mut self) -> ER::LogBatch { - let mut async_writer = self.async_writer.lock().unwrap(); - let mut raft_wb = async_writer.new_wb(); - mem::swap(&mut self.raft_wb, &mut raft_wb); - raft_wb - } - #[inline] pub fn store_id(&self) -> u64 { self.store.get_id() @@ -774,7 +794,7 @@ impl RaftPoller { // the id of slow store in tests. fail_point!("on_raft_ready", self.poll_ctx.store_id() == 3, |_| {}); if self.poll_ctx.trans.need_flush() - && (!self.poll_ctx.kv_wb.is_empty() || !self.poll_ctx.raft_wb.is_empty()) + && (!self.poll_ctx.kv_wb.is_empty() || !self.poll_ctx.raft_wb_is_empty) { self.poll_ctx.trans.flush(); } @@ -803,7 +823,7 @@ impl RaftPoller { STORE_WRITE_KVDB_DURATION_HISTOGRAM .observe(duration_to_sec(now.elapsed()) as f64); - let raft_wb_is_empty = self.poll_ctx.raft_wb.is_empty(); + let raft_wb_is_empty = self.poll_ctx.raft_wb_is_empty; if !raft_wb_is_empty { fail_point!( "raft_before_save_on_store_1", @@ -835,15 +855,22 @@ impl RaftPoller { } } - let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); if !raft_wb_is_empty { - let raft_wb = self.poll_ctx.detach_raft_wb(); + // Do nothing + /* Sync write testing let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); - async_writer.async_write(raft_wb, unsynced_readies); + let raft_wb_pool = async_writer.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let task = raft_wbs.pop_front().unwrap(); + let wb = async_writer.sync_write(task.wb, task.unsynced_readies); + raft_wbs.push_back(AsyncWriterTask{wb, unsynced_readies: HashMap::default()}); + */ } else { + let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); - async_writer.flush_unsynced_readies(unsynced_readies); + async_writer.drain_flush_unsynced_readies(unsynced_readies); } + self.poll_ctx.raft_wb_is_empty = true; let dur = self.timer.elapsed(); if !self.poll_ctx.store_stat.is_busy { @@ -1045,7 +1072,7 @@ pub struct RaftPollerBuilder { applying_snap_count: Arc, global_replication_state: Arc>, pub sync_policy: SyncPolicy>, - pub async_writer: Arc>>, + pub async_writer: Arc>>, } impl RaftPollerBuilder { @@ -1246,7 +1273,8 @@ where store_stat: self.global_stat.local(), engines: self.engines.clone(), kv_wb: self.engines.kv.write_batch(), - raft_wb: self.engines.raft.log_batch(4 * 1024), + //raft_wb: self.engines.raft.log_batch(4 * 1024), + raft_wb_is_empty: true, pending_count: 0, sync_log: false, has_ready: false, @@ -1377,7 +1405,7 @@ impl RaftBatchSystem { cfg.value().delay_sync_enabled(), cfg.value().delay_sync_us as i64, ); - let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), + let async_writer = Arc::new(Mutex::new(AsyncWriter::new(engines.raft.clone(), self.router.clone(), "raftstore-async-writer".to_string(), cfg.value().store_io_pool_size as usize, cfg.value().store_io_queue as usize))); let mut builder = RaftPollerBuilder { diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index e1ab55cf9e4..5f4db195caf 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -216,6 +216,12 @@ lazy_static! { "TODO", exponential_buckets(0.0005, 2.0, 20).unwrap() ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_QUEUE_SIZE: Histogram = + register_histogram!( + "tikv_raftstore_store_write_queue_size", + "TODO", + exponential_buckets(1.0, 2.0, 20).unwrap() + ).unwrap(); pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 0a770b0a95f..cb495e8c76c 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1685,9 +1685,10 @@ where self.handle_raft_committed_entries(ctx, ready.take_committed_entries()); } + let notifier = self.persisted_notifier.clone(); let invoke_ctx = match self .mut_store() - .handle_raft_ready(ctx, &mut ready, destroy_regions) + .handle_raft_ready(ctx, &mut ready, destroy_regions, notifier) { Ok(r) => r, Err(e) => { diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 56986b52244..2a02fcc17b8 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -26,11 +26,16 @@ use crate::{Error, Result}; use engine_traits::{RaftEngine, RaftLogBatch}; use into_other::into_other; use tikv_util::worker::Scheduler; +use std::sync::Mutex; use super::metrics::*; use super::worker::RegionTask; use super::{SnapEntry, SnapKey, SnapManager, SnapshotStatistics}; +use tikv_util::collections::HashMap; +use crate::store::fsm::sync_policy::UnsyncedReady; +use std::sync::atomic::AtomicU64; + // When we create a region peer, we should initialize its log term/index > 0, // so that we can force the follower peer to sync the snapshot first. pub const RAFT_INIT_LOG_TERM: u64 = 5; @@ -304,15 +309,24 @@ impl Drop for EntryCache { } } +pub struct AsyncWriterTask +where + WR: RaftLogBatch, +{ + pub wb: WR, + pub unsynced_readies: HashMap, +} + pub trait HandleRaftReadyContext where WK: Mutable, WR: RaftLogBatch, { /// Returns the mutable references of WriteBatch for both KvDB and RaftDB in one interface. - fn wb_mut(&mut self) -> (&mut WK, &mut WR); + fn wb_mut(&mut self) -> (&mut WK, Arc>>>); fn kv_wb_mut(&mut self) -> &mut WK; - fn raft_wb_mut(&mut self) -> &mut WR; + //fn raft_wb_mut(&mut self) -> &mut WR; + fn raft_wb_pool(&mut self) -> Arc>>>; fn sync_log(&self) -> bool; fn set_sync_log(&mut self, sync: bool); } @@ -1016,6 +1030,8 @@ where invoke_ctx: &mut InvokeContext, entries: Vec, ready_ctx: &mut H, + ready_number: u64, + region_notifier: Arc, ) -> Result { let region_id = self.get_region_id(); debug!( @@ -1043,13 +1059,17 @@ where cache.append(&self.tag, &entries); } - ready_ctx.raft_wb_mut().append(region_id, entries)?; - - // Delete any previously appended log entries which never committed. - // TODO: Wrap it as an engine::Error. - ready_ctx - .raft_wb_mut() - .cut_logs(region_id, last_index + 1, prev_last_index); + { + let raft_wb_pool = ready_ctx.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = raft_wbs.front_mut().unwrap(); + current.wb.append(region_id, entries)?; + current.unsynced_readies.insert(region_id, + UnsyncedReady{number: ready_number, region_id, notifier: region_notifier, version: 0}); + // Delete any previously appended log entries which never committed. + // TODO: Wrap it as an engine::Error. + current.wb.cut_logs(region_id, last_index + 1, prev_last_index); + } invoke_ctx.raft_state.set_last_index(last_index); invoke_ctx.last_term = last_term; @@ -1365,14 +1385,20 @@ where ready_ctx: &mut H, ready: &mut Ready, destroy_regions: Vec, + region_notifier: Arc, ) -> Result { + let region_id = self.get_region_id(); let mut ctx = InvokeContext::new(self); let snapshot_index = if ready.snapshot().is_empty() { 0 } else { fail_point!("raft_before_apply_snap"); - let (kv_wb, raft_wb) = ready_ctx.wb_mut(); - self.apply_snapshot(&mut ctx, ready.snapshot(), kv_wb, raft_wb, &destroy_regions)?; + let (kv_wb, raft_wb_pool) = ready_ctx.wb_mut(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = raft_wbs.front_mut().unwrap(); + self.apply_snapshot(&mut ctx, ready.snapshot(), kv_wb, &mut current.wb, &destroy_regions)?; + current.unsynced_readies.insert(region_id, + UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); fail_point!("raft_after_apply_snap"); ctx.destroyed_regions = destroy_regions; @@ -1381,7 +1407,7 @@ where }; if !ready.entries().is_empty() { - self.append(&mut ctx, ready.take_entries(), ready_ctx)?; + self.append(&mut ctx, ready.take_entries(), ready_ctx, ready.number(), region_notifier.clone())?; } // Last index is 0 means the peer is created from raft message @@ -1394,7 +1420,12 @@ where // Save raft state if it has changed or there is a snapshot. if ctx.raft_state != self.raft_state || snapshot_index > 0 { - ctx.save_raft_state_to(ready_ctx.raft_wb_mut())?; + { + let raft_wb_pool = ready_ctx.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = raft_wbs.front_mut().unwrap(); + ctx.save_raft_state_to(&mut current.wb)?; + } if snapshot_index > 0 { // in case of restart happen when we just write region state to Applying, // but not write raft_local_state to raft rocksdb in time. @@ -1642,7 +1673,7 @@ pub fn write_peer_state( kv_wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), ®ion_state)?; Ok(()) } - +/* #[cfg(test)] mod tests { use crate::coprocessor::CoprocessorHost; @@ -1705,14 +1736,14 @@ mod tests { } impl HandleRaftReadyContext for ReadyContext { - fn wb_mut(&mut self) -> (&mut KvTestWriteBatch, &mut RaftTestWriteBatch) { - (&mut self.kv_wb, &mut self.raft_wb) - } + //fn wb_mut(&mut self) -> (&mut KvTestWriteBatch, &mut RaftTestWriteBatch) { + // (&mut self.kv_wb, &mut self.raft_wb) + //} fn kv_wb_mut(&mut self) -> &mut KvTestWriteBatch { &mut self.kv_wb } - fn raft_wb_mut(&mut self) -> &mut RaftTestWriteBatch { - &mut self.raft_wb + fn raft_wb_pool(&mut self) -> &mut RaftTestWriteBatch { + &mut self.raft_wbs } fn sync_log(&self) -> bool { self.sync_log @@ -2621,3 +2652,5 @@ mod tests { assert!(build_storage().is_err()); } } + +*/ From 2526754ef505440ae7750385b19ea41c01fae501 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Mon, 4 Jan 2021 17:46:09 +0800 Subject: [PATCH 09/13] Support both style of apply IO size Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/apply.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index e9d926a7630..ee8f89ef14a 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -925,7 +925,9 @@ where if !data.is_empty() { let cmd = util::parse_data_at(data, index, &self.tag); - if should_write_to_engine(&cmd) || apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size { + if should_write_to_engine(&cmd) || + (apply_ctx.apply_io_size != 0 && (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size)) || + (apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine()) { apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.elapsed() >= apply_ctx.yield_duration { From 9d037753036c55ad5d9f5865f308927340271d5d Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Mon, 4 Jan 2021 23:33:33 +0800 Subject: [PATCH 10/13] Add store_io_min_interval_us Signed-off-by: Liu Cong --- components/raftstore/src/lib.rs | 2 ++ components/raftstore/src/store/config.rs | 6 +++++ components/raftstore/src/store/fsm/store.rs | 29 +++++++++++---------- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/components/raftstore/src/lib.rs b/components/raftstore/src/lib.rs index e74bae5fe7b..846262bfd2d 100644 --- a/components/raftstore/src/lib.rs +++ b/components/raftstore/src/lib.rs @@ -6,6 +6,8 @@ #![feature(div_duration)] #![feature(min_specialization)] #![feature(box_patterns)] +#![feature(duration_saturating_ops)] +#![feature(duration_zero)] #[macro_use] extern crate bitflags; diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index ed91d223406..b27ec2941c4 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -31,6 +31,8 @@ pub struct Config { #[config(skip)] pub delay_sync_us: u64, #[config(skip)] + pub store_io_min_interval_us: u64, + #[config(skip)] pub store_io_pool_size: u64, #[config(skip)] pub store_io_queue: u64, @@ -202,6 +204,7 @@ impl Default for Config { let split_size = ReadableSize::mb(coprocessor::config::SPLIT_SIZE_MB); Config { delay_sync_us: 0, + store_io_min_interval_us: 500, store_io_pool_size: 2, store_io_queue: 1, apply_io_size: 0, @@ -430,6 +433,9 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["delay_sync_us"]) .set((self.delay_sync_us as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_min_interval_us"]) + .set((self.store_io_min_interval_us as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_pool_size"]) .set((self.store_io_pool_size as i32).into()); diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index bf0c0d1af7e..409b8dcf6d0 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -94,7 +94,7 @@ where engine: ER, router: RaftRouter, tag: String, - buff_size: usize, + io_min_interval: Duration, tasks: Arc>>>, workers: Arc>>>, } @@ -109,7 +109,7 @@ where engine: self.engine.clone(), router: self.router.clone(), tag: self.tag.clone(), - buff_size: self.buff_size, + io_min_interval: self.io_min_interval.clone(), tasks: self.tasks.clone(), workers: self.workers.clone(), } @@ -121,7 +121,8 @@ where EK: KvEngine, ER: RaftEngine, { - pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize, buff_size: usize) -> AsyncWriter { + pub fn new(engine: ER, router: RaftRouter, tag: String, + pool_size: usize, io_min_interval_us: u64) -> AsyncWriter { let mut tasks = VecDeque::default(); for _ in 0..(pool_size + 1) { tasks.push_back( @@ -135,7 +136,7 @@ where engine, router, tag, - buff_size, + io_min_interval: Duration::from_micros(io_min_interval_us), tasks: Arc::new(Mutex::new(tasks)), workers: Arc::new(Mutex::new(vec![])), }; @@ -149,20 +150,20 @@ where let t = thread::Builder::new() .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { - let mut sleep = false; + let mut last_ts = TiInstant::now_coarse(); loop { - // TODO: block if no data in current raft_wb - if sleep { - let d = Duration::from_millis(1); - thread::sleep(d); + let mut now_ts = TiInstant::now_coarse(); + let delta = (now_ts - last_ts).saturating_sub(x.io_min_interval); + if !delta.is_zero() { + thread::sleep(delta); + now_ts = TiInstant::now_coarse(); } - sleep = false; - let now = TiInstant::now_coarse(); + last_ts = now_ts; + // TODO: block if too many data in current raft_wb let task = { let mut tasks = x.tasks.lock().unwrap(); if tasks.front().unwrap().unsynced_readies.is_empty() { - sleep = true; continue; } tasks.pop_front().unwrap() @@ -176,7 +177,7 @@ where } STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM - .observe(duration_to_sec(now.elapsed()) as f64); + .observe(duration_to_sec(now_ts.elapsed()) as f64); } }) .unwrap(); @@ -1407,7 +1408,7 @@ impl RaftBatchSystem { ); let async_writer = Arc::new(Mutex::new(AsyncWriter::new(engines.raft.clone(), self.router.clone(), "raftstore-async-writer".to_string(), - cfg.value().store_io_pool_size as usize, cfg.value().store_io_queue as usize))); + cfg.value().store_io_pool_size as usize, cfg.value().store_io_min_interval_us))); let mut builder = RaftPollerBuilder { cfg, store: meta, From 9b65f6d428fe2ac2f4333ec460157be4d1541703 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Tue, 5 Jan 2021 22:18:08 +0800 Subject: [PATCH 11/13] Fixing corrution Signed-off-by: Liu Cong --- .../raftstore/src/store/peer_storage.rs | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 2a02fcc17b8..d88d1824ab6 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -1025,13 +1025,11 @@ where // to the return one. // WARNING: If this function returns error, the caller must panic otherwise the entry cache may // be wrong and break correctness. - pub fn append>( + pub fn append( &mut self, invoke_ctx: &mut InvokeContext, entries: Vec, - ready_ctx: &mut H, - ready_number: u64, - region_notifier: Arc, + raft_wb: &mut ER::LogBatch, ) -> Result { let region_id = self.get_region_id(); debug!( @@ -1059,17 +1057,10 @@ where cache.append(&self.tag, &entries); } - { - let raft_wb_pool = ready_ctx.raft_wb_pool(); - let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); - current.wb.append(region_id, entries)?; - current.unsynced_readies.insert(region_id, - UnsyncedReady{number: ready_number, region_id, notifier: region_notifier, version: 0}); - // Delete any previously appended log entries which never committed. - // TODO: Wrap it as an engine::Error. - current.wb.cut_logs(region_id, last_index + 1, prev_last_index); - } + raft_wb.append(region_id, entries)?; + // Delete any previously appended log entries which never committed. + // TODO: Wrap it as an engine::Error. + raft_wb.cut_logs(region_id, last_index + 1, prev_last_index); invoke_ctx.raft_state.set_last_index(last_index); invoke_ctx.last_term = last_term; @@ -1406,8 +1397,13 @@ where last_index(&ctx.raft_state) }; + let (_, raft_wb_pool) = ready_ctx.wb_mut(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = raft_wbs.front_mut().unwrap(); + current.unsynced_readies.insert(region_id, + UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); if !ready.entries().is_empty() { - self.append(&mut ctx, ready.take_entries(), ready_ctx, ready.number(), region_notifier.clone())?; + self.append(&mut ctx, ready.take_entries(), &mut current.wb)?; } // Last index is 0 means the peer is created from raft message @@ -1420,12 +1416,7 @@ where // Save raft state if it has changed or there is a snapshot. if ctx.raft_state != self.raft_state || snapshot_index > 0 { - { - let raft_wb_pool = ready_ctx.raft_wb_pool(); - let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); - ctx.save_raft_state_to(&mut current.wb)?; - } + ctx.save_raft_state_to(&mut current.wb)?; if snapshot_index > 0 { // in case of restart happen when we just write region state to Applying, // but not write raft_local_state to raft rocksdb in time. From 0e0bc0e958927141435fa60b910c84fbdc49149d Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Tue, 5 Jan 2021 23:35:31 +0800 Subject: [PATCH 12/13] Fixed corrution Signed-off-by: Liu Cong --- .../raftstore/src/store/peer_storage.rs | 68 +++++++++---------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index d88d1824ab6..fc46edbc882 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -1380,54 +1380,50 @@ where ) -> Result { let region_id = self.get_region_id(); let mut ctx = InvokeContext::new(self); - let snapshot_index = if ready.snapshot().is_empty() { - 0 - } else { - fail_point!("raft_before_apply_snap"); - let (kv_wb, raft_wb_pool) = ready_ctx.wb_mut(); + let mut snapshot_index = 0; + + { + let raft_wb_pool = ready_ctx.raft_wb_pool(); let mut raft_wbs = raft_wb_pool.lock().unwrap(); let current = raft_wbs.front_mut().unwrap(); - self.apply_snapshot(&mut ctx, ready.snapshot(), kv_wb, &mut current.wb, &destroy_regions)?; - current.unsynced_readies.insert(region_id, - UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); - fail_point!("raft_after_apply_snap"); - ctx.destroyed_regions = destroy_regions; - - last_index(&ctx.raft_state) - }; + if !ready.snapshot().is_empty() { + fail_point!("raft_before_apply_snap"); + self.apply_snapshot(&mut ctx, ready.snapshot(), ready_ctx.kv_wb_mut(), &mut current.wb, &destroy_regions)?; + fail_point!("raft_after_apply_snap"); + ctx.destroyed_regions = destroy_regions; + snapshot_index = last_index(&ctx.raft_state); + }; - let (_, raft_wb_pool) = ready_ctx.wb_mut(); - let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); - current.unsynced_readies.insert(region_id, - UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); - if !ready.entries().is_empty() { - self.append(&mut ctx, ready.take_entries(), &mut current.wb)?; - } + if !ready.entries().is_empty() { + self.append(&mut ctx, ready.take_entries(), &mut current.wb)?; + } + // Last index is 0 means the peer is created from raft message + // and has not applied snapshot yet, so skip persistent hard state. + if ctx.raft_state.get_last_index() > 0 { + if let Some(hs) = ready.hs() { + ctx.raft_state.set_hard_state(hs.clone()); + } + } - // Last index is 0 means the peer is created from raft message - // and has not applied snapshot yet, so skip persistent hard state. - if ctx.raft_state.get_last_index() > 0 { - if let Some(hs) = ready.hs() { - ctx.raft_state.set_hard_state(hs.clone()); + // Save raft state if it has changed or there is a snapshot. + if ctx.raft_state != self.raft_state || snapshot_index > 0 { + ctx.save_raft_state_to(&mut current.wb)?; } - } - // Save raft state if it has changed or there is a snapshot. - if ctx.raft_state != self.raft_state || snapshot_index > 0 { - ctx.save_raft_state_to(&mut current.wb)?; - if snapshot_index > 0 { - // in case of restart happen when we just write region state to Applying, - // but not write raft_local_state to raft rocksdb in time. - // we write raft state to default rocksdb, with last index set to snap index, - // in case of recv raft log after snapshot. - ctx.save_snapshot_raft_state_to(snapshot_index, ready_ctx.kv_wb_mut())?; + if ready.must_sync() { + current.unsynced_readies.insert(region_id, + UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); } } // only when apply snapshot if snapshot_index > 0 { + // in case of restart happen when we just write region state to Applying, + // but not write raft_local_state to raft rocksdb in time. + // we write raft state to default rocksdb, with last index set to snap index, + // in case of recv raft log after snapshot. + ctx.save_snapshot_raft_state_to(snapshot_index, ready_ctx.kv_wb_mut())?; ctx.save_apply_state_to(ready_ctx.kv_wb_mut())?; } From c6c40263b608fcdcaa41fd5a07cf1469bef2a08d Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 18 Jan 2021 19:06:25 +0800 Subject: [PATCH 13/13] turn on entry cache Signed-off-by: gengliqi --- components/raftstore/src/store/peer_storage.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index fc46edbc882..8fff63dfcbf 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -694,11 +694,11 @@ where let last_term = init_last_term(&engines, region, &raft_state, &apply_state)?; let applied_index_term = init_applied_index_term(&engines, region, &apply_state)?; - let cache = if engines.raft.has_builtin_entry_cache() { + /*let cache = if engines.raft.has_builtin_entry_cache() { None } else { Some(EntryCache::default()) - }; + };*/ Ok(PeerStorage { engines, @@ -713,7 +713,7 @@ where tag, applied_index_term, last_term, - cache, + cache: Some(EntryCache::default()), }) } @@ -1083,11 +1083,11 @@ where } pub fn maybe_gc_cache(&mut self, replicated_idx: u64, apply_idx: u64) { - if self.engines.raft.has_builtin_entry_cache() { + /*if self.engines.raft.has_builtin_entry_cache() { let rid = self.get_region_id(); self.engines.raft.gc_entry_cache(rid, apply_idx + 1); return; - } + }*/ let cache = self.cache.as_mut().unwrap(); if replicated_idx == apply_idx { @@ -1193,9 +1193,10 @@ where ) -> Result<()> { let region_id = self.get_region_id(); clear_meta(&self.engines, kv_wb, raft_wb, region_id, &self.raft_state)?; - if !self.engines.raft.has_builtin_entry_cache() { + /*if !self.engines.raft.has_builtin_entry_cache() { self.cache = Some(EntryCache::default()); - } + }*/ + self.cache = Some(EntryCache::default()); Ok(()) }