Skip to content

Commit

Permalink
raftstore: set is_merging flag after restart (tikv#5871)
Browse files Browse the repository at this point in the history
Signed-off-by: gengliqi <gengliqiii@gmail.com>
  • Loading branch information
gengliqi authored and sre-bot committed Nov 26, 2019
1 parent d7c8f7f commit 43dd558
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ impl ApplyDelegate {
merged: false,
ready_source_region_id: 0,
wait_merge_state: None,
is_merging: false,
is_merging: reg.is_merging,
pending_cmds: Default::default(),
metrics: Default::default(),
last_merge_version: 0,
Expand Down Expand Up @@ -2133,6 +2133,7 @@ pub struct Registration {
pub apply_state: RaftApplyState,
pub applied_index_term: u64,
pub region: Region,
pub is_merging: bool,
}

impl Registration {
Expand All @@ -2143,6 +2144,7 @@ impl Registration {
apply_state: peer.get_store().apply_state().clone(),
applied_index_term: peer.get_store().applied_index_term(),
region: peer.region().clone(),
is_merging: peer.pending_merge_state.is_some(),
}
}
}
Expand Down Expand Up @@ -2368,6 +2370,8 @@ impl ApplyFsm {
self.delegate.region_id() == 1000 && self.delegate.id() == 1003,
|_| {}
);
fail_point!("on_handle_apply", |_| {});

if apply.entries.is_empty() || self.delegate.pending_remove || self.delegate.stopped {
return;
}
Expand Down
2 changes: 2 additions & 0 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

fn on_apply_res(&mut self, res: ApplyTaskRes) {
fail_point!("on_apply_res", |_| {});
match res {
ApplyTaskRes::Apply(mut res) => {
debug!(
Expand Down Expand Up @@ -2428,6 +2429,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.register_raft_gc_log_tick();
}
debug_assert!(!self.fsm.stopped);
fail_point!("on_raft_gc_log_tick", |_| {});

// As leader, we would not keep caches for the peers that didn't response heartbeat in the
// last few seconds. That happens probably because another TiKV is down. In this case if we
Expand Down
84 changes: 84 additions & 0 deletions tests/failpoints/cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,87 @@ fn test_node_merge_multiple_snapshots(together: bool) {
cluster.clear_send_filters();
must_get_equal(&cluster.get_engine(3), b"k9", b"v9");
}

// Test if compact log is ignored after premerge was applied and restart
// I.e. is_merging flag should be set after restart
#[test]
fn test_node_merge_restart_after_apply_premerge_before_apply_compact_log() {
let _guard = crate::setup();
let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);
cluster.cfg.raft_store.merge_max_log_gap = 10;
cluster.cfg.raft_store.raft_log_gc_count_limit = 11;
// rely on this config to trigger a compact log
cluster.cfg.raft_store.raft_log_gc_size_limit = ReadableSize(1);
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10);
cluster.run();
// prevent gc_log_tick to propose a compact log
let raft_gc_log_tick_fp = "on_raft_gc_log_tick";
fail::cfg(raft_gc_log_tick_fp, "return()").unwrap();
cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");

let pd_client = Arc::clone(&cluster.pd_client);
let region = pd_client.get_region(b"k1").unwrap();

cluster.must_split(&region, b"k2");

let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k2").unwrap();
let left_peer_1 = find_peer(&left, 1).cloned().unwrap();
cluster.must_transfer_leader(left.get_id(), left_peer_1);

// make log gap between store 1 and store 3, for min_index in preMerge
cluster.add_send_filter(IsolationFilterFactory::new(3));
for i in 0..6 {
cluster.must_put(format!("k1{}", i).as_bytes(), b"v1");
}
// prevent on_apply_res to update merge_state in Peer
// if not, almost everything cannot propose including compact log
let on_apply_res_fp = "on_apply_res";
fail::cfg(on_apply_res_fp, "return()").unwrap();

let merge = new_prepare_merge(right.clone());
let req = new_admin_request(left.get_id(), left.get_region_epoch(), merge);
let resp = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();
if resp.get_header().has_error() {
panic!("response {:?} has error", resp);
}
cluster.clear_send_filters();
// prevent apply fsm to apply compact log
let handle_apply_fp = "on_handle_apply";
fail::cfg(handle_apply_fp, "return()").unwrap();

let state1 = cluster.truncated_state(left.get_id(), 1);
fail::remove(raft_gc_log_tick_fp);

// wait for compact log to be proposed and committed maybe
sleep_ms(30);

cluster.shutdown();

fail::remove(handle_apply_fp);
fail::remove(on_apply_res_fp);
// prevent sched_merge_tick to propose CommitMerge
let schedule_merge_fp = "on_schedule_merge";
fail::cfg(schedule_merge_fp, "return()").unwrap();

cluster.start().unwrap();

// wait for compact log to apply
for _ in 0..50 {
let state2 = cluster.truncated_state(left.get_id(), 1);
if state1.get_index() != state2.get_index() {
break;
}
sleep_ms(10);
}
// can schedule merge now
fail::remove(schedule_merge_fp);

// propose to left region and wait for merge to succeed conveniently
cluster.must_put(b"k123", b"v2");
must_get_equal(&cluster.get_engine(3), b"k123", b"v2");
}

0 comments on commit 43dd558

Please sign in to comment.