From 0dfefacd894bc1c8d17a9a59aca57f3e7169e604 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 20 May 2019 11:30:41 +0800 Subject: [PATCH] apply: rename wb to kv_wb (#4721) Signed-off-by: Neil Shen --- src/raftstore/store/fsm/apply.rs | 98 ++++++++++++++++---------------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/src/raftstore/store/fsm/apply.rs b/src/raftstore/store/fsm/apply.rs index a7595295cd7..c4f50d6b466 100644 --- a/src/raftstore/store/fsm/apply.rs +++ b/src/raftstore/store/fsm/apply.rs @@ -286,9 +286,9 @@ struct ApplyContext { apply_res: Vec, exec_ctx: Option, - wb: Option, - wb_last_bytes: u64, - wb_last_keys: u64, + kv_wb: Option, + kv_wb_last_bytes: u64, + kv_wb_last_keys: u64, last_applied_index: u64, committed_count: usize, @@ -321,11 +321,11 @@ impl ApplyContext { engines, router, notifier, - wb: None, + kv_wb: None, cbs: MustConsumeVec::new("callback of apply context"), apply_res: vec![], - wb_last_bytes: 0, - wb_last_keys: 0, + kv_wb_last_bytes: 0, + kv_wb_last_keys: 0, last_applied_index: 0, committed_count: 0, enable_sync_log: cfg.sync_log, @@ -341,10 +341,10 @@ impl ApplyContext { /// `prepare_for` -> `commit` [-> `commit` ...] -> `finish_for`. /// After all delegates are handled, `write_to_db` method should be called. pub fn prepare_for(&mut self, delegate: &ApplyDelegate) { - if self.wb.is_none() { - self.wb = Some(WriteBatch::with_capacity(DEFAULT_APPLY_WB_SIZE)); - self.wb_last_bytes = 0; - self.wb_last_keys = 0; + if self.kv_wb.is_none() { + self.kv_wb = Some(WriteBatch::with_capacity(DEFAULT_APPLY_WB_SIZE)); + self.kv_wb_last_bytes = 0; + self.kv_wb_last_keys = 0; } self.cbs.push(ApplyCallback::new(delegate.region.clone())); self.last_applied_index = delegate.apply_state.get_applied_index(); @@ -356,7 +356,7 @@ impl ApplyContext { /// This call is valid only when it's between a `prepare_for` and `finish_for`. pub fn commit(&mut self, delegate: &mut ApplyDelegate) { if self.last_applied_index < delegate.apply_state.get_applied_index() { - delegate.write_apply_state(&self.engines, self.wb.as_mut().unwrap()); + delegate.write_apply_state(&self.engines, self.kv_wb.as_mut().unwrap()); } // last_applied_index doesn't need to be updated, set persistent to true will // force it call `prepare_for` automatically. @@ -369,32 +369,32 @@ impl ApplyContext { self.write_to_db(); self.prepare_for(delegate); } - self.wb_last_bytes = self.wb().data_size() as u64; - self.wb_last_keys = self.wb().count() as u64; + self.kv_wb_last_bytes = self.kv_wb().data_size() as u64; + self.kv_wb_last_keys = self.kv_wb().count() as u64; } /// Writes all the changes into RocksDB. pub fn write_to_db(&mut self) { - if self.wb.as_ref().map_or(false, |wb| !wb.is_empty()) { + if self.kv_wb.as_ref().map_or(false, |wb| !wb.is_empty()) { let mut write_opts = WriteOptions::new(); write_opts.set_sync(self.enable_sync_log && self.sync_log_hint); self.engines .kv - .write_opt(self.wb(), &write_opts) + .write_opt(self.kv_wb(), &write_opts) .unwrap_or_else(|e| { panic!("failed to write to engine: {:?}", e); }); self.sync_log_hint = false; - let data_size = self.wb().data_size(); + let data_size = self.kv_wb().data_size(); if data_size > APPLY_WB_SHRINK_SIZE { // Control the memory usage for the WriteBatch. - self.wb = Some(WriteBatch::with_capacity(DEFAULT_APPLY_WB_SIZE)); + self.kv_wb = Some(WriteBatch::with_capacity(DEFAULT_APPLY_WB_SIZE)); } else { // Clear data, reuse the WriteBatch, this can reduce memory allocations and deallocations. - self.wb().clear(); + self.kv_wb().clear(); } - self.wb_last_bytes = 0; - self.wb_last_keys = 0; + self.kv_wb_last_bytes = 0; + self.kv_wb_last_keys = 0; } for cbs in self.cbs.drain(..) { cbs.invoke_all(&self.host); @@ -404,7 +404,7 @@ impl ApplyContext { /// Finishes `Apply`s for the delegate. pub fn finish_for(&mut self, delegate: &mut ApplyDelegate, results: VecDeque) { if !delegate.pending_remove { - delegate.write_apply_state(&self.engines, self.wb.as_mut().unwrap()); + delegate.write_apply_state(&self.engines, self.kv_wb.as_mut().unwrap()); } self.commit_opt(delegate, false); self.apply_res.push(ApplyRes { @@ -418,21 +418,21 @@ impl ApplyContext { } pub fn delta_bytes(&self) -> u64 { - self.wb().data_size() as u64 - self.wb_last_bytes + self.kv_wb().data_size() as u64 - self.kv_wb_last_bytes } pub fn delta_keys(&self) -> u64 { - self.wb().count() as u64 - self.wb_last_keys + self.kv_wb().count() as u64 - self.kv_wb_last_keys } #[inline] - pub fn wb(&self) -> &WriteBatch { - self.wb.as_ref().unwrap() + pub fn kv_wb(&self) -> &WriteBatch { + self.kv_wb.as_ref().unwrap() } #[inline] - pub fn wb_mut(&mut self) -> &mut WriteBatch { - self.wb.as_mut().unwrap() + pub fn kv_kv_wb_mut(&mut self) -> &mut WriteBatch { + self.kv_wb.as_mut().unwrap() } pub fn flush(&mut self) { @@ -508,7 +508,7 @@ pub fn notify_stale_req(term: u64, cb: Callback) { } /// Checks if a write is needed to be issued before handling the command. -fn should_write_to_engine(cmd: &RaftCmdRequest, wb_keys: usize) -> bool { +fn should_write_to_engine(cmd: &RaftCmdRequest, kv_wb_keys: usize) -> bool { if cmd.has_admin_request() { match cmd.get_admin_request().get_cmd_type() { // ComputeHash require an up to date snapshot. @@ -522,7 +522,7 @@ fn should_write_to_engine(cmd: &RaftCmdRequest, wb_keys: usize) -> bool { // When write batch contains more than `recommended` keys, write the batch // to engine. - if wb_keys >= WRITE_BATCH_MAX_KEYS { + if kv_wb_keys >= WRITE_BATCH_MAX_KEYS { return true; } @@ -777,7 +777,7 @@ impl ApplyDelegate { if !data.is_empty() { let cmd = util::parse_data_at(data, index, &self.tag); - if should_write_to_engine(&cmd, apply_ctx.wb().count()) { + if should_write_to_engine(&cmd, apply_ctx.kv_wb().count()) { apply_ctx.commit(self); } @@ -913,15 +913,15 @@ impl ApplyDelegate { assert!(!self.pending_remove); ctx.exec_ctx = Some(self.new_ctx(index, term)); - ctx.wb_mut().set_save_point(); + ctx.kv_kv_wb_mut().set_save_point(); let (resp, exec_result) = match self.exec_raft_cmd(ctx, req) { Ok(a) => { - ctx.wb_mut().pop_save_point().unwrap(); + ctx.kv_kv_wb_mut().pop_save_point().unwrap(); a } Err(e) => { // clear dirty values. - ctx.wb_mut().rollback_to_save_point().unwrap(); + ctx.kv_kv_wb_mut().rollback_to_save_point().unwrap(); match e { Error::EpochNotMatch(..) => debug!( "epoch not match"; @@ -1151,7 +1151,7 @@ impl ApplyDelegate { } // TODO: check whether cf exists or not. rocks::util::get_cf_handle(&ctx.engines.kv, cf) - .and_then(|handle| ctx.wb().put_cf(handle, &key, value).map_err(Into::into)) + .and_then(|handle| ctx.kv_wb().put_cf(handle, &key, value).map_err(Into::into)) .unwrap_or_else(|e| { panic!( "{} failed to write ({}, {}) to cf {}: {:?}", @@ -1163,7 +1163,7 @@ impl ApplyDelegate { ) }); } else { - ctx.wb().put(&key, value).unwrap_or_else(|e| { + ctx.kv_wb().put(&key, value).unwrap_or_else(|e| { panic!( "{} failed to write ({}, {}): {:?}", self.tag, @@ -1189,7 +1189,7 @@ impl ApplyDelegate { let cf = req.get_delete().get_cf(); // TODO: check whether cf exists or not. rocks::util::get_cf_handle(&ctx.engines.kv, cf) - .and_then(|handle| ctx.wb().delete_cf(handle, &key).map_err(Into::into)) + .and_then(|handle| ctx.kv_wb().delete_cf(handle, &key).map_err(Into::into)) .unwrap_or_else(|e| { panic!("{} failed to delete {}: {:?}", self.tag, escape(&key), e) }); @@ -1201,7 +1201,7 @@ impl ApplyDelegate { self.metrics.delete_keys_hint += 1; } } else { - ctx.wb().delete(&key).unwrap_or_else(|e| { + ctx.kv_wb().delete(&key).unwrap_or_else(|e| { panic!("{} failed to delete {}: {:?}", self.tag, escape(&key), e) }); self.metrics.delete_keys_hint += 1; @@ -1497,8 +1497,8 @@ impl ApplyDelegate { } else { PeerState::Normal }; - let wb_mut = ctx.wb.as_mut().unwrap(); - if let Err(e) = write_peer_state(&ctx.engines.kv, wb_mut, ®ion, state, None) { + let kv_wb_mut = ctx.kv_wb.as_mut().unwrap(); + if let Err(e) = write_peer_state(&ctx.engines.kv, kv_wb_mut, ®ion, state, None) { panic!("{} failed to update region state: {:?}", self.tag, e); } @@ -1609,7 +1609,7 @@ impl ApplyDelegate { regions.push(derived.clone()); } let kv = &ctx.engines.kv; - let wb_mut = ctx.wb.as_mut().unwrap(); + let kv_wb_mut = ctx.kv_wb.as_mut().unwrap(); for req in split_reqs.get_requests() { let mut new_region = Region::new(); // TODO: check new region id validation. @@ -1625,8 +1625,8 @@ impl ApplyDelegate { { peer.set_id(*peer_id); } - write_peer_state(kv, wb_mut, &new_region, PeerState::Normal, None) - .and_then(|_| write_initial_apply_state(kv, wb_mut, new_region.get_id())) + write_peer_state(kv, kv_wb_mut, &new_region, PeerState::Normal, None) + .and_then(|_| write_initial_apply_state(kv, kv_wb_mut, new_region.get_id())) .unwrap_or_else(|e| { panic!( "{} fails to save split region {:?}: {:?}", @@ -1639,7 +1639,7 @@ impl ApplyDelegate { derived.set_start_key(keys.pop_front().unwrap()); regions.push(derived.clone()); } - write_peer_state(kv, wb_mut, &derived, PeerState::Normal, None).unwrap_or_else(|e| { + write_peer_state(kv, kv_wb_mut, &derived, PeerState::Normal, None).unwrap_or_else(|e| { panic!("{} fails to update region {:?}: {:?}", self.tag, derived, e) }); let mut resp = AdminResponse::new(); @@ -1692,7 +1692,7 @@ impl ApplyDelegate { merging_state.set_commit(exec_ctx.index); write_peer_state( &ctx.engines.kv, - ctx.wb.as_mut().unwrap(), + ctx.kv_wb.as_mut().unwrap(), ®ion, PeerState::Merging, Some(merging_state.clone()), @@ -1863,15 +1863,15 @@ impl ApplyDelegate { region.set_start_key(source_region.get_start_key().to_vec()); } let kv = &ctx.engines.kv; - let wb_mut = ctx.wb.as_mut().unwrap(); - write_peer_state(kv, wb_mut, ®ion, PeerState::Normal, None) + let kv_wb_mut = ctx.kv_wb.as_mut().unwrap(); + write_peer_state(kv, kv_wb_mut, ®ion, PeerState::Normal, None) .and_then(|_| { // TODO: maybe all information needs to be filled? let mut merging_state = MergeState::new(); merging_state.set_target(self.region.clone()); write_peer_state( kv, - wb_mut, + kv_wb_mut, source_region, PeerState::Tombstone, Some(merging_state), @@ -1924,8 +1924,8 @@ impl ApplyDelegate { // Update version to avoid duplicated rollback requests. region.mut_region_epoch().set_version(version + 1); let kv = &ctx.engines.kv; - let wb_mut = ctx.wb.as_mut().unwrap(); - write_peer_state(kv, wb_mut, ®ion, PeerState::Normal, None).unwrap_or_else(|e| { + let kv_wb_mut = ctx.kv_wb.as_mut().unwrap(); + write_peer_state(kv, kv_wb_mut, ®ion, PeerState::Normal, None).unwrap_or_else(|e| { panic!( "{} failed to rollback merge {:?}: {:?}", self.tag, rollback, e