diff --git a/benches/suites/raw_node.rs b/benches/suites/raw_node.rs index f0fc3346d..d894d2bc1 100644 --- a/benches/suites/raw_node.rs +++ b/benches/suites/raw_node.rs @@ -103,7 +103,10 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode { let mut node = quick_raw_node(logger); node.raft.become_candidate(); node.raft.become_leader(); - node.raft.raft_log.stable_to(1, 1); + let unstable = node.raft.raft_log.unstable_entries().to_vec(); + node.raft.raft_log.stable_entries(); + node.raft.raft_log.store.wl().append(&unstable).expect(""); + node.raft.on_persist_entries(1, 1); node.raft.commit_apply(1); let mut entries = vec![]; for i in 1..101 { @@ -114,11 +117,14 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode { e.term = 1; entries.push(e); } - let mut unstable_entries = entries.clone(); - node.raft.raft_log.store.wl().append(&entries).expect(""); - node.raft.raft_log.unstable.offset = 102; + let _ = node.raft.append_entry(&mut entries); + let unstable = node.raft.raft_log.unstable_entries().to_vec(); + node.raft.raft_log.stable_entries(); + node.raft.raft_log.store.wl().append(&unstable).expect(""); + node.raft.raft_log.stable_entries(); // This increases 'committed_index' to `last_index` because there is only one node in quorum. - let _ = node.raft.append_entry(&mut unstable_entries); + node.raft + .on_persist_entries(node.raft.raft_log.last_index(), 1); let mut snap = Snapshot::default(); snap.set_data(vec![0; 8 * 1024 * 1024]); @@ -128,6 +134,6 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode { node.raft.msgs.push(Message::default()); } // Force reverting committed index to provide us some entries to be stored from next `Ready` - node.raft.raft_log.committed = 101; + node.raft.raft_log.committed = 1; node } diff --git a/examples/five_mem_node/main.rs b/examples/five_mem_node/main.rs index a76b90c40..6ed6a2730 100644 --- a/examples/five_mem_node/main.rs +++ b/examples/five_mem_node/main.rs @@ -246,15 +246,22 @@ fn on_ready( // Get the `Ready` with `RawNode::ready` interface. let mut ready = raft_group.ready(); - // Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize - // raft logs to the latest position. - if let Err(e) = store.wl().append(ready.entries()) { - error!( - logger, - "persist raft log fail: {:?}, need to retry or panic", e - ); - return; - } + let handle_messages = |msgs: Vec>| { + for vec_msg in msgs { + for msg in vec_msg { + let to = msg.to; + if mailboxes[&to].send(msg).is_err() { + error!( + logger, + "send raft message to {} fail, let Raft retry it", to + ); + } + } + } + }; + + // Send out the messages come from the node. + handle_messages(ready.take_messages()); // Apply the snapshot. It's necessary because in `RawNode::advance` we stabilize the snapshot. if *ready.snapshot() != Snapshot::default() { @@ -268,54 +275,62 @@ fn on_ready( } } - // Send out the messages come from the node. - for msg in ready.messages.drain(..) { - let to = msg.to; - if mailboxes[&to].send(msg).is_err() { - error!( - logger, - "send raft message to {} fail, let Raft retry it", to - ); - } - } - - // Apply all committed proposals. - if let Some(committed_entries) = ready.committed_entries.take() { - for entry in &committed_entries { - if entry.data.is_empty() { - // From new elected leaders. - continue; - } - if let EntryType::EntryConfChange = entry.get_entry_type() { - // For conf change messages, make them effective. - let mut cc = ConfChange::default(); - cc.merge_from_bytes(&entry.data).unwrap(); - let cs = raft_group.apply_conf_change(&cc).unwrap(); - store.wl().set_conf_state(cs); - } else { - // For normal proposals, extract the key-value pair and then - // insert them into the kv engine. - let data = str::from_utf8(&entry.data).unwrap(); - let reg = Regex::new("put ([0-9]+) (.+)").unwrap(); - if let Some(caps) = reg.captures(&data) { - kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string()); + let mut handle_committed_entries = + |rn: &mut RawNode, committed_entries: Vec| { + for entry in committed_entries { + if entry.data.is_empty() { + // From new elected leaders. + continue; + } + if let EntryType::EntryConfChange = entry.get_entry_type() { + // For conf change messages, make them effective. + let mut cc = ConfChange::default(); + cc.merge_from_bytes(&entry.data).unwrap(); + let cs = rn.apply_conf_change(&cc).unwrap(); + store.wl().set_conf_state(cs); + } else { + // For normal proposals, extract the key-value pair and then + // insert them into the kv engine. + let data = str::from_utf8(&entry.data).unwrap(); + let reg = Regex::new("put ([0-9]+) (.+)").unwrap(); + if let Some(caps) = reg.captures(&data) { + kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string()); + } + } + if rn.raft.state == StateRole::Leader { + // The leader should response to the clients, tell them if their proposals + // succeeded or not. + let proposal = proposals.lock().unwrap().pop_front().unwrap(); + proposal.propose_success.send(true).unwrap(); } } - if raft_group.raft.state == StateRole::Leader { - // The leader should response to the clients, tell them if their proposals - // succeeded or not. - let proposal = proposals.lock().unwrap().pop_front().unwrap(); - proposal.propose_success.send(true).unwrap(); - } - } - if let Some(last_committed) = committed_entries.last() { - let mut s = store.wl(); - s.mut_hard_state().commit = last_committed.index; - s.mut_hard_state().term = last_committed.term; - } + }; + // Apply all committed entries. + handle_committed_entries(raft_group, ready.take_committed_entries()); + + // Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize + // raft logs to the latest position. + if let Err(e) = store.wl().append(ready.entries()) { + error!( + logger, + "persist raft log fail: {:?}, need to retry or panic", e + ); + return; } + + if let Some(hs) = ready.hs() { + // Raft HardState changed, and we need to persist it. + store.wl().set_hardstate(hs.clone()); + } + // Call `RawNode::advance` interface to update position flags in the raft. - raft_group.advance(ready); + let mut light_rd = raft_group.advance(ready); + // Send out the messages. + handle_messages(light_rd.take_messages()); + // Apply all committed entries. + handle_committed_entries(raft_group, light_rd.take_committed_entries()); + // Advance the apply index. + raft_group.advance_apply(); } fn example_config() -> Config { diff --git a/examples/single_mem_node/main.rs b/examples/single_mem_node/main.rs index c88beab78..b7c36d481 100644 --- a/examples/single_mem_node/main.rs +++ b/examples/single_mem_node/main.rs @@ -102,52 +102,33 @@ fn main() { } } -fn on_ready(r: &mut RawNode, cbs: &mut HashMap) { - if !r.has_ready() { +fn on_ready(raft_group: &mut RawNode, cbs: &mut HashMap) { + if !raft_group.has_ready() { return; } + let store = raft_group.raft.raft_log.store.clone(); - // The Raft is ready, we can do something now. - let mut ready = r.ready(); + // Get the `Ready` with `RawNode::ready` interface. + let mut ready = raft_group.ready(); - let is_leader = r.raft.leader_id == r.raft.id; - if is_leader { - // If the peer is leader, the leader can send messages to other followers ASAP. - let msgs = ready.messages.drain(..); - for _msg in msgs { - // Here we only have one peer, so can ignore this. + let handle_messages = |msgs: Vec>| { + for vec_msg in msgs { + for _msg in vec_msg { + // Send messages to other peers. + } } - } + }; + + // Send out the messages come from the node. + handle_messages(ready.take_messages()); if !ready.snapshot().is_empty() { // This is a snapshot, we need to apply the snapshot at first. - r.mut_store() - .wl() - .apply_snapshot(ready.snapshot().clone()) - .unwrap(); - } - - if !ready.entries().is_empty() { - // Append entries to the Raft log - r.mut_store().wl().append(ready.entries()).unwrap(); - } - - if let Some(hs) = ready.hs() { - // Raft HardState changed, and we need to persist it. - r.mut_store().wl().set_hardstate(hs.clone()); - } - - if !is_leader { - // If not leader, the follower needs to reply the messages to - // the leader after appending Raft entries. - let msgs = ready.messages.drain(..); - for _msg in msgs { - // Send messages to other peers. - } + store.wl().apply_snapshot(ready.snapshot().clone()).unwrap(); } - if let Some(committed_entries) = ready.committed_entries.take() { - let mut _last_apply_index = 0; + let mut _last_apply_index = 0; + let mut handle_committed_entries = |committed_entries: Vec| { for entry in committed_entries { // Mostly, you need to save the last apply index to resume applying // after restart. Here we just ignore this because we use a Memory storage. @@ -166,10 +147,27 @@ fn on_ready(r: &mut RawNode, cbs: &mut HashMap) // TODO: handle EntryConfChange } + }; + handle_committed_entries(ready.take_committed_entries()); + + if !ready.entries().is_empty() { + // Append entries to the Raft log. + store.wl().append(&ready.entries()).unwrap(); + } + + if let Some(hs) = ready.hs() { + // Raft HardState changed, and we need to persist it. + store.wl().set_hardstate(hs.clone()); } - // Advance the Raft - r.advance(ready); + // Advance the Raft. + let mut light_rd = raft_group.advance(ready); + // Send out the messages. + handle_messages(light_rd.take_messages()); + // Apply all committed entries. + handle_committed_entries(light_rd.take_committed_entries()); + // Advance the apply index. + raft_group.advance_apply(); } fn send_propose(logger: Logger, sender: mpsc::Sender) { diff --git a/harness/src/interface.rs b/harness/src/interface.rs index b963ea0ce..1fb49703a 100644 --- a/harness/src/interface.rs +++ b/harness/src/interface.rs @@ -52,6 +52,26 @@ impl Interface { None => vec![], } } + + /// Persist the unstable snapshot and entries. + pub fn persist(&mut self) { + if self.raft.is_some() { + if let Some(snapshot) = self.raft_log.unstable_snapshot() { + let snap = snapshot.clone(); + self.raft_log.stable_snap(); + let index = snap.get_metadata().index; + self.mut_store().wl().apply_snapshot(snap).expect(""); + self.commit_apply(index); + } + let unstable = self.raft_log.unstable_entries().to_vec(); + if !unstable.is_empty() { + self.raft_log.stable_entries(); + let last_entry = unstable.last().unwrap(); + self.mut_store().wl().append(&unstable).expect(""); + self.on_persist_entries(last_entry.index, last_entry.term); + } + } + } } impl From>> for Interface { diff --git a/harness/src/network.rs b/harness/src/network.rs index 1da2dc624..3620d1bca 100644 --- a/harness/src/network.rs +++ b/harness/src/network.rs @@ -167,6 +167,8 @@ impl Network { let resp = { let p = self.peers.get_mut(&m.to).unwrap(); let _ = p.step(m); + // The unstable data should be persisted before sending msg. + p.persist(); p.read_messages() }; new_msgs.append(&mut self.filter(resp)); diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index ff1712073..060a0261b 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -99,11 +99,11 @@ fn voted_with_config( // Persist committed index and fetch next entries. fn next_ents(r: &mut Raft, s: &MemStorage) -> Vec { - if let Some(entries) = r.raft_log.unstable_entries() { - s.wl().append(entries).expect(""); - } + let unstable = r.raft_log.unstable_entries().to_vec(); + r.raft_log.stable_entries(); + s.wl().append(&unstable).expect(""); let (last_idx, last_term) = (r.raft_log.last_index(), r.raft_log.last_term()); - r.raft_log.stable_to(last_idx, last_term); + r.on_persist_entries(last_idx, last_term); let ents = r.raft_log.next_entries(); r.commit_apply(r.raft_log.committed); ents.unwrap_or_else(Vec::new) @@ -301,6 +301,8 @@ fn test_progress_leader() { let mut raft = new_test_raft(1, vec![1, 2], 5, 1, new_storage(), &l); raft.become_candidate(); raft.become_leader(); + // For no-op entry + raft.persist(); raft.mut_prs().get_mut(2).unwrap().become_replicate(); let prop_msg = new_message(1, 1, MessageType::MsgPropose, 1); @@ -316,6 +318,7 @@ fn test_progress_leader() { assert_eq!(next_idx, matched + 1); assert!(raft.step(prop_msg.clone()).is_ok()); + raft.persist(); } } @@ -1480,7 +1483,8 @@ fn test_msg_append_response_wait_reset() { let mut sm = new_test_raft(1, vec![1, 2, 3], 5, 1, new_storage(), &l); sm.become_candidate(); sm.become_leader(); - + // For no-op entry + sm.persist(); // The new leader has just emitted a new Term 4 entry; consume those messages // from the outgoing queue. sm.bcast_append(); @@ -1498,6 +1502,7 @@ fn test_msg_append_response_wait_reset() { m = new_message(1, 0, MessageType::MsgPropose, 0); m.entries = vec![empty_entry(0, 0)].into(); sm.step(m).expect(""); + sm.persist(); // The command is broadcast to all nodes not in the wait state. // Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting. @@ -2682,8 +2687,9 @@ fn test_bcast_beat() { sm.become_candidate(); sm.become_leader(); for i in 0..10 { - let _ = sm.append_entry(&mut [empty_entry(0, i as u64 + 1)]); + let _ = sm.append_entry(&mut [empty_entry(0, offset + i + 1)]); } + sm.persist(); // slow follower let mut_pr = |sm: &mut Interface, n, matched, next_idx| { let m = sm.mut_prs().get_mut(n).unwrap(); @@ -2691,7 +2697,7 @@ fn test_bcast_beat() { m.next_idx = next_idx; }; // slow follower - mut_pr(&mut sm, 2, 5, 6); + mut_pr(&mut sm, 2, offset + 5, offset + 6); // normal follower let last_index = sm.raft_log.last_index(); mut_pr(&mut sm, 3, last_index, last_index + 1); @@ -2798,6 +2804,7 @@ fn test_leader_increase_next() { for (i, (state, next_idx, wnext)) in tests.drain(..).enumerate() { let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); sm.raft_log.append(&previous_ents); + sm.persist(); sm.become_candidate(); sm.become_leader(); sm.mut_prs().get_mut(2).unwrap().state = state; @@ -2973,6 +2980,7 @@ fn test_provide_snap() { let mut sm = new_test_raft(1, vec![1], 10, 1, new_storage(), &l); sm.restore(s); + sm.persist(); sm.become_candidate(); sm.become_leader(); @@ -2996,6 +3004,7 @@ fn test_ignore_providing_snapshot() { let s = new_snapshot(11, 11, vec![1, 2]); // magic number let mut sm = new_test_raft(1, vec![1], 10, 1, new_storage(), &l); sm.restore(s); + sm.persist(); sm.become_candidate(); sm.become_leader(); @@ -3127,6 +3136,7 @@ fn test_new_leader_pending_config() { if add_entry { e.set_entry_type(EntryType::EntryNormal); let _ = r.append_entry(&mut [e]); + r.persist(); } r.become_candidate(); r.become_leader(); @@ -3992,6 +4002,8 @@ fn test_learner_receive_snapshot() { let n2 = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage(), &l); n1.restore(s); + n1.persist(); + let committed = n1.raft_log.committed; n1.commit_apply(committed); @@ -4495,7 +4507,6 @@ fn prepare_request_snapshot() -> (Network, Snapshot) { .unwrap() .raft_log .unstable_entries() - .unwrap_or(&[]) .to_vec(); nt.storage[&1].wl().append(&ents).unwrap(); nt.storage[&1].wl().commit_to(14).unwrap(); diff --git a/harness/tests/integration_cases/test_raft_paper.rs b/harness/tests/integration_cases/test_raft_paper.rs index 102511a78..f027faace 100644 --- a/harness/tests/integration_cases/test_raft_paper.rs +++ b/harness/tests/integration_cases/test_raft_paper.rs @@ -34,13 +34,13 @@ pub fn commit_noop_entry(r: &mut Interface, s: &MemStorage) { } // ignore further messages to refresh followers' commit index r.read_messages(); - s.wl() - .append(r.raft_log.unstable_entries().unwrap_or(&[])) - .expect(""); + let unstable = r.raft_log.unstable_entries().to_vec(); + r.raft_log.stable_entries(); + s.wl().append(&unstable).expect(""); + let (last_index, last_term) = (r.raft_log.last_index(), r.raft_log.last_term()); + r.on_persist_entries(last_index, last_term); let committed = r.raft_log.committed; r.commit_apply(committed); - let (last_index, last_term) = (r.raft_log.last_index(), r.raft_log.last_term()); - r.raft_log.stable_to(last_index, last_term); } fn accept_and_reply(m: &Message) -> Message { @@ -451,7 +451,7 @@ fn test_leader_start_replication() { new_message_ext(1, 3, wents.clone().into()), ]; assert_eq!(msgs, expect_msgs); - assert_eq!(r.raft_log.unstable_entries(), Some(&*wents)); + assert_eq!(r.raft_log.unstable_entries(), &*wents); } // test_leader_commit_entry tests that when the entry has been safely replicated, @@ -472,6 +472,7 @@ fn test_leader_commit_entry() { let li = r.raft_log.last_index(); r.step(new_message(1, 1, MessageType::MsgPropose, 1)) .expect(""); + r.persist(); for m in r.read_messages() { r.step(accept_and_reply(&m)).expect(""); @@ -515,6 +516,7 @@ fn test_leader_acknowledge_commit() { let li = r.raft_log.last_index(); r.step(new_message(1, 1, MessageType::MsgPropose, 1)) .expect(""); + r.persist(); for m in r.read_messages() { if acceptors.contains_key(&m.to) && acceptors[&m.to] { @@ -557,6 +559,7 @@ fn test_leader_commit_preceding_entries() { r.step(new_message(1, 1, MessageType::MsgPropose, 1)) .expect(""); + r.persist(); for m in r.read_messages() { r.step(accept_and_reply(&m)).expect(""); @@ -615,6 +618,7 @@ fn test_follower_commit_entry() { m.commit = commit; m.entries = ents.clone().into(); r.step(m).expect(""); + r.persist(); if r.raft_log.committed != commit { panic!( @@ -751,12 +755,7 @@ fn test_follower_append_entries() { panic!("#{}: ents = {:?}, want {:?}", i, g, wents); } let g = r.raft_log.unstable_entries(); - let wunstable = if wunstable.is_empty() { - None - } else { - Some(&*wunstable) - }; - if g != wunstable { + if g != &*wunstable { panic!("#{}: unstable_entries = {:?}, want {:?}", i, g, wunstable); } } @@ -1030,6 +1029,7 @@ fn test_leader_only_commits_log_from_current_term() { // propose a entry to current term r.step(new_message(1, 1, MessageType::MsgPropose, 1)) .expect(""); + r.persist(); let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); m.term = r.term; diff --git a/harness/tests/integration_cases/test_raft_snap.rs b/harness/tests/integration_cases/test_raft_snap.rs index 38e584ad8..3319e85e5 100644 --- a/harness/tests/integration_cases/test_raft_snap.rs +++ b/harness/tests/integration_cases/test_raft_snap.rs @@ -28,6 +28,7 @@ fn test_sending_snapshot_set_pending_snapshot() { let l = default_logger(); let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); sm.restore(testing_snap()); + sm.persist(); sm.become_candidate(); sm.become_leader(); @@ -51,6 +52,7 @@ fn test_pending_snapshot_pause_replication() { let l = default_logger(); let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); sm.restore(testing_snap()); + sm.persist(); sm.become_candidate(); sm.become_leader(); @@ -67,6 +69,7 @@ fn test_snapshot_failure() { let l = default_logger(); let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); sm.restore(testing_snap()); + sm.persist(); sm.become_candidate(); sm.become_leader(); @@ -88,6 +91,7 @@ fn test_snapshot_succeed() { let l = default_logger(); let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); sm.restore(testing_snap()); + sm.persist(); sm.become_candidate(); sm.become_leader(); @@ -109,6 +113,7 @@ fn test_snapshot_abort() { let l = default_logger(); let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); sm.restore(testing_snap()); + sm.persist(); sm.become_candidate(); sm.become_leader(); @@ -151,6 +156,7 @@ fn test_request_snapshot() { let l = default_logger(); let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); sm.restore(testing_snap()); + sm.persist(); // Raft can not step request snapshot if there is no leader. assert_eq!( diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index 0ee7b7bef..7eb4c66d4 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -31,22 +31,28 @@ fn conf_change(t: ConfChangeType, node_id: u64) -> ConfChange { cc } +#[allow(clippy::too_many_arguments)] fn must_cmp_ready( r: &Ready, ss: &Option, hs: &Option, entries: &[Entry], - committed_entries: Vec, + committed_entries: &[Entry], + snapshot: &Option, + msg_is_empty: bool, must_sync: bool, ) { assert_eq!(r.ss(), ss.as_ref()); assert_eq!(r.hs(), hs.as_ref()); - assert_eq!(r.entries(), entries); - assert_eq!(r.committed_entries, Some(committed_entries)); + assert_eq!(r.entries().as_slice(), entries); + assert_eq!(r.committed_entries().as_slice(), committed_entries); assert_eq!(r.must_sync(), must_sync); assert!(r.read_states().is_empty()); - assert_eq!(r.snapshot(), &Snapshot::default()); - assert!(r.messages.is_empty()); + assert_eq!( + r.snapshot(), + snapshot.as_ref().unwrap_or(&Snapshot::default()) + ); + assert_eq!(r.messages().is_empty(), msg_is_empty); } fn new_raw_node( @@ -250,21 +256,29 @@ fn test_raw_node_propose_and_conf_change() { // Propose the ConfChange, wait until it applies, save the resulting ConfState. let mut cs = None; while cs.is_none() { - let rd = raw_node.ready(); + let mut rd = raw_node.ready(); s.wl().append(rd.entries()).unwrap(); - for e in rd.committed_entries.as_ref().unwrap() { - if e.get_entry_type() == EntryType::EntryConfChange { - let mut cc = ConfChange::default(); - cc.merge_from_bytes(e.get_data()).unwrap(); - cs = Some(raw_node.apply_conf_change(&cc).unwrap()); - } else if e.get_entry_type() == EntryType::EntryConfChangeV2 { - let mut cc = ConfChangeV2::default(); - cc.merge_from_bytes(e.get_data()).unwrap(); - cs = Some(raw_node.apply_conf_change(&cc).unwrap()); - } - } + let mut handle_committed_entries = + |rn: &mut RawNode, committed_entries: Vec| { + for e in committed_entries { + if e.get_entry_type() == EntryType::EntryConfChange { + let mut cc = ConfChange::default(); + cc.merge_from_bytes(e.get_data()).unwrap(); + cs = Some(rn.apply_conf_change(&cc).unwrap()); + } else if e.get_entry_type() == EntryType::EntryConfChangeV2 { + let mut cc = ConfChangeV2::default(); + cc.merge_from_bytes(e.get_data()).unwrap(); + cs = Some(rn.apply_conf_change(&cc).unwrap()); + } + } + }; + handle_committed_entries(&mut raw_node, rd.take_committed_entries()); let is_leader = rd.ss().map_or(false, |ss| ss.leader_id == raw_node.raft.id); - raw_node.advance(rd); + + let mut light_rd = raw_node.advance(rd); + handle_committed_entries(&mut raw_node, light_rd.take_committed_entries()); + raw_node.advance_apply(); + // Once we are the leader, propose a command and a ConfChange. if !proposed && is_leader { raw_node.propose(vec![], b"somedata".to_vec()).unwrap(); @@ -328,7 +342,6 @@ fn test_raw_node_propose_and_conf_change() { raw_node.propose_conf_change(vec![], cc).unwrap(); rd = raw_node.ready(); } - // Check that the right ConfChange comes out. assert_eq!(rd.entries().len(), 1); assert_eq!( @@ -366,23 +379,31 @@ fn test_raw_node_joint_auto_leave() { // Propose the ConfChange, wait until it applies, save the resulting ConfState. let mut cs = None; while cs.is_none() { - let rd = raw_node.ready(); + let mut rd = raw_node.ready(); s.wl().append(rd.entries()).unwrap(); - for e in rd.committed_entries.as_ref().unwrap() { - if e.get_entry_type() == EntryType::EntryConfChangeV2 { - let mut cc = ConfChangeV2::default(); - cc.merge_from_bytes(e.get_data()).unwrap(); + let mut handle_committed_entries = + |rn: &mut RawNode, committed_entries: Vec| { + for e in committed_entries { + if e.get_entry_type() == EntryType::EntryConfChangeV2 { + let mut cc = ConfChangeV2::default(); + cc.merge_from_bytes(e.get_data()).unwrap(); + + // Force it step down. + let mut msg = new_message(1, 1, MessageType::MsgHeartbeatResponse, 0); + msg.term = rn.raft.term + 1; + rn.step(msg).unwrap(); + + cs = Some(rn.apply_conf_change(&cc).unwrap()); + } + } + }; + handle_committed_entries(&mut raw_node, rd.take_committed_entries()); + let is_leader = rd.ss().map_or(false, |ss| ss.leader_id == raw_node.raft.id); - // Force it step down. - let mut msg = new_message(1, 1, MessageType::MsgHeartbeatResponse, 0); - msg.term = raw_node.raft.term + 1; - raw_node.step(msg).unwrap(); + let mut light_rd = raw_node.advance(rd); + handle_committed_entries(&mut raw_node, light_rd.take_committed_entries()); + raw_node.advance_apply(); - cs = Some(raw_node.apply_conf_change(&cc).unwrap()); - } - } - let is_leader = rd.ss().map_or(false, |ss| ss.leader_id == raw_node.raft.id); - raw_node.advance(rd); // Once we are the leader, propose a command and a ConfChange. if !proposed && is_leader { raw_node.propose(vec![], b"somedata".to_vec()).unwrap(); @@ -409,12 +430,14 @@ fn test_raw_node_joint_auto_leave() { // Move the RawNode along. It should not leave joint because it's follower. let mut rd = raw_node.ready(); assert!(rd.entries().is_empty()); + let _ = raw_node.advance(rd); // Make it leader again. It should leave joint automatically after moving apply index. raw_node.campaign().unwrap(); rd = raw_node.ready(); s.wl().append(rd.entries()).unwrap(); - raw_node.advance(rd); + let _ = raw_node.advance(rd); + rd = raw_node.ready(); s.wl().append(rd.entries()).unwrap(); @@ -447,24 +470,31 @@ fn test_raw_node_propose_add_duplicate_node() { let rd = raw_node.ready(); s.wl().append(rd.entries()).expect(""); if rd.ss().map_or(false, |ss| ss.leader_id == raw_node.raft.id) { - raw_node.advance(rd); + let _ = raw_node.advance(rd); break; } - raw_node.advance(rd); + let _ = raw_node.advance(rd); } let mut propose_conf_change_and_apply = |cc| { raw_node.propose_conf_change(vec![], cc).expect(""); - let rd = raw_node.ready(); + let mut rd = raw_node.ready(); s.wl().append(rd.entries()).expect(""); - for e in rd.committed_entries.as_ref().unwrap() { - if e.get_entry_type() == EntryType::EntryConfChange { - let mut conf_change = ConfChange::default(); - conf_change.merge_from_bytes(&e.data).unwrap(); - raw_node.apply_conf_change(&conf_change).unwrap(); - } - } - raw_node.advance(rd); + let handle_committed_entries = + |rn: &mut RawNode, committed_entries: Vec| { + for e in committed_entries { + if e.get_entry_type() == EntryType::EntryConfChange { + let mut conf_change = ConfChange::default(); + conf_change.merge_from_bytes(&e.data).unwrap(); + rn.apply_conf_change(&conf_change).unwrap(); + } + } + }; + handle_committed_entries(&mut raw_node, rd.take_committed_entries()); + + let mut light_rd = raw_node.advance(rd); + handle_committed_entries(&mut raw_node, light_rd.take_committed_entries()); + raw_node.advance_apply(); }; let cc1 = conf_change(ConfChangeType::AddNode, 1); @@ -494,18 +524,18 @@ fn test_raw_node_propose_add_learner_node() -> Result<()> { let s = new_storage(); let mut raw_node = new_raw_node(1, vec![1], 10, 1, s.clone(), &l); let rd = raw_node.ready(); - s.wl().append(rd.entries()).expect(""); - raw_node.advance(rd); + must_cmp_ready(&rd, &None, &None, &[], &[], &None, true, false); + let _ = raw_node.advance(rd); raw_node.campaign().expect(""); loop { let rd = raw_node.ready(); s.wl().append(rd.entries()).expect(""); if rd.ss().map_or(false, |ss| ss.leader_id == raw_node.raft.id) { - raw_node.advance(rd); + let _ = raw_node.advance(rd); break; } - raw_node.advance(rd); + let _ = raw_node.advance(rd); } // propose add learner node and check apply state @@ -515,13 +545,15 @@ fn test_raw_node_propose_add_learner_node() -> Result<()> { let rd = raw_node.ready(); s.wl().append(rd.entries()).expect(""); + let light_rd = raw_node.advance(rd); + assert_eq!( - rd.committed_entries.as_ref().unwrap().len(), + light_rd.committed_entries().len(), 1, "should committed the conf change entry" ); - let e = &rd.committed_entries.as_ref().unwrap()[0]; + let e = &light_rd.committed_entries()[0]; assert_eq!(e.get_entry_type(), EntryType::EntryConfChange); let mut conf_change = ConfChange::default(); conf_change.merge_from_bytes(&e.data).unwrap(); @@ -550,22 +582,22 @@ fn test_raw_node_read_index() { let rd = raw_node.ready(); s.wl().append(rd.entries()).expect(""); if rd.ss().map_or(false, |ss| ss.leader_id == raw_node.raft.id) { - raw_node.advance(rd); + let _ = raw_node.advance(rd); // Once we are the leader, issue a read index request raw_node.read_index(wrequest_ctx); break; } - raw_node.advance(rd); + let _ = raw_node.advance(rd); } // ensure the read_states can be read out assert!(!raw_node.raft.read_states.is_empty()); assert!(raw_node.has_ready()); let rd = raw_node.ready(); - assert_eq!(rd.read_states(), wrs.as_slice()); - s.wl().append(&rd.entries()).expect(""); - raw_node.advance(rd); + assert_eq!(*rd.read_states(), wrs); + s.wl().append(rd.entries()).expect(""); + let _ = raw_node.advance(rd); // ensure raft.read_states is reset after advance assert!(!raw_node.has_ready()); @@ -582,35 +614,46 @@ fn test_raw_node_start() { let mut raw_node = new_raw_node(1, vec![1], 10, 1, store.clone(), &l); let rd = raw_node.ready(); - must_cmp_ready(&rd, &None, &None, &[], vec![], false); - store.wl().append(rd.entries()).unwrap(); - raw_node.advance(rd); + must_cmp_ready(&rd, &None, &None, &[], &[], &None, true, false); + let _ = raw_node.advance(rd); raw_node.campaign().expect(""); let rd = raw_node.ready(); must_cmp_ready( &rd, &Some(soft_state(1, StateRole::Leader)), - &Some(hard_state(2, 2, 1)), + &Some(hard_state(2, 1, 1)), &[new_entry(2, 2, None)], - vec![new_entry(2, 2, None)], + &[], + &None, + true, true, ); store.wl().append(rd.entries()).expect(""); - raw_node.advance(rd); + let light_rd = raw_node.advance(rd); + assert_eq!(light_rd.commit_index(), Some(2)); + assert_eq!(*light_rd.committed_entries(), vec![new_entry(2, 2, None)]); + assert!(!raw_node.has_ready()); raw_node.propose(vec![], b"somedata".to_vec()).expect(""); let rd = raw_node.ready(); must_cmp_ready( &rd, &None, - &Some(hard_state(2, 3, 1)), + &None, &[new_entry(2, 3, SOME_DATA)], - vec![new_entry(2, 3, SOME_DATA)], + &[], + &None, + true, true, ); store.wl().append(rd.entries()).expect(""); - raw_node.advance(rd); + let light_rd = raw_node.advance(rd); + assert_eq!(light_rd.commit_index(), Some(3)); + assert_eq!( + *light_rd.committed_entries(), + vec![new_entry(2, 3, SOME_DATA)] + ); assert!(!raw_node.has_ready()); } @@ -628,8 +671,8 @@ fn test_raw_node_restart() { }; let rd = raw_node.ready(); - must_cmp_ready(&rd, &None, &None, &[], entries[..1].to_vec(), false); - raw_node.advance(rd); + must_cmp_ready(&rd, &None, &None, &[], &entries[..1], &None, true, false); + let _ = raw_node.advance(rd); assert!(!raw_node.has_ready()); } @@ -640,8 +683,7 @@ fn test_raw_node_restart_from_snapshot() { let entries = vec![new_entry(1, 3, Some("foo"))]; let mut raw_node = { - let raw_node = new_raw_node(1, vec![], 10, 1, new_storage(), &l); - let store = raw_node.raft.r.raft_log.store; + let store = new_storage(); store.wl().apply_snapshot(snap).unwrap(); store.wl().append(&entries).unwrap(); store.wl().set_hardstate(hard_state(1, 3, 0)); @@ -649,8 +691,8 @@ fn test_raw_node_restart_from_snapshot() { }; let rd = raw_node.ready(); - must_cmp_ready(&rd, &None, &None, &[], entries, false); - raw_node.advance(rd); + must_cmp_ready(&rd, &None, &None, &[], &entries, &None, true, false); + let _ = raw_node.advance(rd); assert!(!raw_node.has_ready()); } @@ -761,29 +803,577 @@ fn test_bounded_uncommitted_entries_growth_with_partition() { .ss() .map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id) { + let _ = raw_node.advance(rd); break; } - raw_node.advance(rd); + let _ = raw_node.advance(rd); } // should be accepted - let data = b"hello world!".to_vec(); - let result = raw_node.propose(vec![], data); + let data = b"hello world!"; + let result = raw_node.propose(vec![], data.to_vec()); assert!(result.is_ok()); // shoule be dropped - let data = b"hello world!".to_vec(); - let result = raw_node.propose(vec![], data); + let result = raw_node.propose(vec![], data.to_vec()); assert!(!result.is_ok()); assert_eq!(result.unwrap_err(), Error::ProposalDropped); // should be accepted when previous data has been committed let rd = raw_node.ready(); s.wl().append(rd.entries()).unwrap(); - raw_node.advance(rd); + let _ = raw_node.advance(rd); let data = b"hello world!".to_vec(); let result = raw_node.propose(vec![], data); assert!(result.is_ok()); } + +#[test] +fn test_raw_node_with_async_apply() { + let l = default_logger(); + let s = new_storage(); + s.wl().apply_snapshot(new_snapshot(1, 1, vec![1])).unwrap(); + + let mut raw_node = new_raw_node(1, vec![1], 10, 1, s.clone(), &l); + raw_node.campaign().unwrap(); + let rd = raw_node.ready(); + // Single node should become leader. + assert!(rd + .ss() + .map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id)); + s.wl().append(&rd.entries()).unwrap(); + let _ = raw_node.advance(rd); + + let mut last_index = raw_node.raft.raft_log.last_index(); + + let data = b"hello world!"; + + for _ in 1..10 { + let cnt = rand::random::() % 10 + 1; + for _ in 0..cnt { + raw_node.propose(vec![], data.to_vec()).unwrap(); + } + + let rd = raw_node.ready(); + let entries = rd.entries().clone(); + assert_eq!(entries.first().unwrap().get_index(), last_index + 1); + assert_eq!(entries.last().unwrap().get_index(), last_index + cnt); + must_cmp_ready(&rd, &None, &None, &entries, &[], &None, true, true); + + s.wl().append(&entries).unwrap(); + + let light_rd = raw_node.advance_append(rd); + assert_eq!(entries, *light_rd.committed_entries()); + assert_eq!(light_rd.commit_index(), Some(last_index + cnt)); + + // No matter how applied index changes, the index of next committed + // entries should be the same. + raw_node.advance_apply_to(last_index + 1); + assert!(!raw_node.has_ready()); + + last_index += cnt; + } +} + +/// Test if the ready process is expected when a follower receives a snapshot +/// and some committed entries after its snapshot. +#[test] +fn test_raw_node_entries_after_snapshot() { + let l = default_logger(); + let s = new_storage(); + s.wl() + .apply_snapshot(new_snapshot(1, 1, vec![1, 2])) + .unwrap(); + + let mut raw_node = new_raw_node(1, vec![1, 2], 10, 1, s.clone(), &l); + + let snapshot = new_snapshot(10, 2, vec![1, 2]); + let mut snapshot_msg = new_message(2, 1, MessageType::MsgSnapshot, 0); + snapshot_msg.set_term(2); + snapshot_msg.set_snapshot(snapshot.clone()); + raw_node.step(snapshot_msg).unwrap(); + + let entries = [ + new_entry(2, 11, Some("hello")), + new_entry(2, 12, Some("hello")), + new_entry(2, 13, Some("hello")), + ]; + let mut append_msg = new_message_with_entries(2, 1, MessageType::MsgAppend, entries.to_vec()); + append_msg.set_term(2); + append_msg.set_index(10); + append_msg.set_log_term(2); + append_msg.set_commit(12); + raw_node.step(append_msg).unwrap(); + + let rd = raw_node.ready(); + // If there is a snapshot, the committed entries should be empty. + must_cmp_ready( + &rd, + &Some(soft_state(2, StateRole::Follower)), + &Some(hard_state(2, 12, 0)), + &entries, + &[], + &Some(snapshot), + true, + true, + ); + s.wl().set_hardstate(rd.hs().unwrap().clone()); + s.wl().apply_snapshot(rd.snapshot().clone()).unwrap(); + s.wl().append(rd.entries()).unwrap(); + + let light_rd = raw_node.advance(rd); + assert_eq!(light_rd.commit_index(), None); + assert_eq!(light_rd.committed_entries().as_slice(), &entries[..2]); + // Should have a MsgAppendResponse + assert_eq!( + light_rd.messages()[0][0].get_msg_type(), + MessageType::MsgAppendResponse + ); +} + +/// Test if the given committed entries are persisted when some persisted +/// entries are overwritten by a new leader. +#[test] +fn test_raw_node_overwrite_entries() { + let l = default_logger(); + let s = new_storage(); + s.wl() + .apply_snapshot(new_snapshot(1, 1, vec![1, 2, 3])) + .unwrap(); + + let mut raw_node = new_raw_node(1, vec![1, 2, 3], 10, 1, s.clone(), &l); + + let entries = [ + new_entry(2, 2, Some("hello")), + new_entry(2, 3, Some("hello")), + new_entry(2, 4, Some("hello")), + ]; + let mut append_msg = new_message_with_entries(2, 1, MessageType::MsgAppend, entries.to_vec()); + append_msg.set_term(2); + append_msg.set_index(1); + append_msg.set_log_term(1); + append_msg.set_commit(1); + raw_node.step(append_msg).unwrap(); + + let rd = raw_node.ready(); + must_cmp_ready( + &rd, + &Some(soft_state(2, StateRole::Follower)), + &Some(hard_state(2, 1, 0)), + &entries, + &[], + &None, + true, + true, + ); + s.wl().set_hardstate(rd.hs().unwrap().clone()); + s.wl().append(rd.entries()).unwrap(); + + let light_rd = raw_node.advance(rd); + assert_eq!(light_rd.commit_index(), None); + assert!(light_rd.committed_entries().is_empty()); + // Append entries response + assert!(!light_rd.messages().is_empty()); + + let entries_2 = [ + new_entry(3, 4, Some("hello")), + new_entry(3, 5, Some("hello")), + new_entry(3, 6, Some("hello")), + ]; + let mut append_msg = new_message_with_entries(3, 1, MessageType::MsgAppend, entries_2.to_vec()); + append_msg.set_term(3); + append_msg.set_index(3); + append_msg.set_log_term(2); + append_msg.set_commit(5); + raw_node.step(append_msg).unwrap(); + + let rd = raw_node.ready(); + must_cmp_ready( + &rd, + &Some(soft_state(3, StateRole::Follower)), + &Some(hard_state(3, 5, 0)), + &entries_2, + &entries[..2], + &None, + true, + true, + ); + s.wl().set_hardstate(rd.hs().unwrap().clone()); + s.wl().append(rd.entries()).unwrap(); + + let light_rd = raw_node.advance(rd); + assert_eq!(light_rd.commit_index(), None); + assert_eq!(light_rd.committed_entries().as_slice(), &entries_2[..2]); + // Append entries response + assert!(!light_rd.messages().is_empty()); +} + +/// Test if async ready process is expected when a leader receives +/// the append response and persist its entries. +#[test] +fn test_async_ready_leader() { + let l = default_logger(); + let s = new_storage(); + s.wl() + .apply_snapshot(new_snapshot(1, 1, vec![1, 2, 3])) + .unwrap(); + + let mut raw_node = new_raw_node(1, vec![1, 2, 3], 10, 1, s.clone(), &l); + raw_node.raft.become_candidate(); + raw_node.raft.become_leader(); + let rd = raw_node.ready(); + assert!(rd + .ss() + .map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id)); + s.wl().append(&rd.entries()).unwrap(); + let _ = raw_node.advance(rd); + + assert_eq!(raw_node.raft.term, 2); + let mut first_index = raw_node.raft.raft_log.last_index(); + + let data = b"hello world!"; + + // Set node 2 progress to replicate + raw_node.raft.mut_prs().get_mut(2).unwrap().matched = 1; + raw_node + .raft + .mut_prs() + .get_mut(2) + .unwrap() + .become_replicate(); + for i in 0..10 { + for _ in 0..10 { + raw_node.propose(vec![], data.to_vec()).unwrap(); + } + + let mut rd = raw_node.ready(); + assert_eq!(rd.number(), i + 2); + let entries = rd.entries().clone(); + assert_eq!( + entries.first().unwrap().get_index(), + first_index + i * 10 + 1 + ); + assert_eq!( + entries.last().unwrap().get_index(), + first_index + i * 10 + 10 + ); + // Leader‘s msg can be sent immediately. + must_cmp_ready(&rd, &None, &None, &entries, &[], &None, false, true); + for vec_msg in rd.take_messages() { + for msg in vec_msg { + assert_eq!(msg.get_msg_type(), MessageType::MsgAppend); + } + } + + s.wl().append(&entries).unwrap(); + raw_node.advance_append_async(rd); + } + // Unpersisted Ready number in range [2, 11] + raw_node.on_persist_ready(4); + // No new committed entries due to two nodes in this cluster + assert!(!raw_node.has_ready()); + + // The index of uncommitted entries in range [first_index, first_index + 100] + let mut append_response = new_message(2, 1, MessageType::MsgAppendResponse, 0); + append_response.set_term(2); + append_response.set_index(first_index + 100); + + raw_node.step(append_response).unwrap(); + + // Forward commit index due to append response + let rd = raw_node.ready(); + assert_eq!(rd.hs(), Some(&hard_state(2, first_index + 30, 1))); + assert_eq!( + rd.committed_entries().first().unwrap().get_index(), + first_index + ); + assert_eq!( + rd.committed_entries().last().unwrap().get_index(), + first_index + 30 + ); + assert!(!rd.messages().is_empty()); + s.wl().set_hardstate(rd.hs().unwrap().clone()); + raw_node.advance_append_async(rd); + + // Forward commit index due to persist ready + raw_node.on_persist_ready(8); + let rd = raw_node.ready(); + assert_eq!(rd.hs(), Some(&hard_state(2, first_index + 70, 1))); + assert_eq!( + rd.committed_entries().first().unwrap().get_index(), + first_index + 31 + ); + assert_eq!( + rd.committed_entries().last().unwrap().get_index(), + first_index + 70 + ); + assert!(!rd.messages().is_empty()); + s.wl().set_hardstate(rd.hs().unwrap().clone()); + + // Forward commit index due to persist last ready + let light_rd = raw_node.advance_append(rd); + assert_eq!(light_rd.commit_index(), Some(first_index + 100)); + assert_eq!( + light_rd.committed_entries().first().unwrap().get_index(), + first_index + 71 + ); + assert_eq!( + light_rd.committed_entries().last().unwrap().get_index(), + first_index + 100 + ); + assert!(!light_rd.messages().is_empty()); + + // Test when 2 followers response the append entries msg and leader has + // not persisted them yet. + first_index += 100; + for _ in 0..10 { + raw_node.propose(vec![], data.to_vec()).unwrap(); + } + + let mut rd = raw_node.ready(); + assert_eq!(rd.number(), 14); + let entries = rd.entries().clone(); + assert_eq!(entries.first().unwrap().get_index(), first_index + 1); + assert_eq!(entries.last().unwrap().get_index(), first_index + 10); + // Leader‘s msg can be sent immediately. + must_cmp_ready(&rd, &None, &None, &entries, &[], &None, false, true); + for vec_msg in rd.take_messages() { + for msg in vec_msg { + assert_eq!(msg.get_msg_type(), MessageType::MsgAppend); + } + } + s.wl().append(&entries).unwrap(); + raw_node.advance_append_async(rd); + + let mut append_response = new_message(2, 1, MessageType::MsgAppendResponse, 0); + append_response.set_term(2); + append_response.set_index(first_index + 9); + + raw_node.step(append_response).unwrap(); + + let mut append_response = new_message(3, 1, MessageType::MsgAppendResponse, 0); + append_response.set_term(2); + append_response.set_index(first_index + 10); + + raw_node.step(append_response).unwrap(); + + let mut rd = raw_node.ready(); + // It should has some append msgs and its commit index should be first_index + 9. + must_cmp_ready( + &rd, + &None, + &Some(hard_state(2, first_index + 9, 1)), + &[], + &[], + &None, + false, + false, + ); + for vec_msg in rd.take_messages() { + for msg in vec_msg { + assert_eq!(msg.get_msg_type(), MessageType::MsgAppend); + assert_eq!(msg.get_commit(), first_index + 9); + } + } + + // Forward commit index due to peer 1's append response and persisted entries + let light_rd = raw_node.advance_append(rd); + assert_eq!(light_rd.commit_index(), Some(first_index + 10)); + assert_eq!( + light_rd.committed_entries().first().unwrap().get_index(), + first_index + 1 + ); + assert_eq!( + light_rd.committed_entries().last().unwrap().get_index(), + first_index + 10 + ); + assert!(!light_rd.messages().is_empty()); +} + +/// Test if async ready process is expected when a follower receives +/// some append msg. +#[test] +fn test_async_ready_follower() { + let l = default_logger(); + let s = new_storage(); + s.wl() + .apply_snapshot(new_snapshot(1, 1, vec![1, 2])) + .unwrap(); + + let mut raw_node = new_raw_node(1, vec![1, 2], 10, 1, s.clone(), &l); + let mut first_index = 1; + let mut rd_number = 1; + for cnt in 0..3 { + for i in 0..10 { + let entries = [ + new_entry(2, first_index + i * 3 + 1, Some("hello")), + new_entry(2, first_index + i * 3 + 2, Some("hello")), + new_entry(2, first_index + i * 3 + 3, Some("hello")), + ]; + let mut append_msg = + new_message_with_entries(2, 1, MessageType::MsgAppend, entries.to_vec()); + append_msg.set_term(2); + append_msg.set_index(first_index + i * 3); + if cnt == 0 && i == 0 { + append_msg.set_log_term(1); + } else { + append_msg.set_log_term(2); + } + append_msg.set_commit(first_index + i * 3 + 3); + raw_node.step(append_msg).unwrap(); + + let rd = raw_node.ready(); + assert_eq!(rd.number(), rd_number + i); + assert_eq!(rd.hs(), Some(&hard_state(2, first_index + i * 3 + 3, 0))); + assert_eq!(rd.entries(), &entries); + assert_eq!(rd.committed_entries().as_slice(), &[]); + assert!(rd.messages().is_empty()); + + s.wl().set_hardstate(rd.hs().unwrap().clone()); + s.wl().append(rd.entries()).unwrap(); + raw_node.advance_append_async(rd); + } + // Unpersisted Ready number in range [1, 10] + raw_node.on_persist_ready(rd_number + 3); + let mut rd = raw_node.ready(); + assert_eq!(rd.hs(), None); + assert_eq!( + rd.committed_entries().first().unwrap().get_index(), + first_index + 1 + ); + assert_eq!( + rd.committed_entries().last().unwrap().get_index(), + first_index + 3 * 3 + 3 + ); + let mut msg_num = 0; + for vec_msg in rd.take_messages() { + for msg in vec_msg { + assert_eq!(msg.get_msg_type(), MessageType::MsgAppendResponse); + msg_num += 1; + } + } + assert_eq!(msg_num, 4); + + let mut light_rd = raw_node.advance_append(rd); + assert_eq!(light_rd.commit_index(), None); + assert_eq!( + light_rd.committed_entries().first().unwrap().get_index(), + first_index + 3 * 3 + 4 + ); + assert_eq!( + light_rd.committed_entries().last().unwrap().get_index(), + first_index + 10 * 3 + ); + let mut msg_num = 0; + for vec_msg in light_rd.take_messages() { + for msg in vec_msg { + assert_eq!(msg.get_msg_type(), MessageType::MsgAppendResponse); + msg_num += 1; + } + } + assert_eq!(msg_num, 6); + + first_index += 10 * 3; + rd_number += 11; + } +} + +/// Test if a new leader immediately sends all messages recorded before without +/// persisting. +#[test] +fn test_async_ready_become_leader() { + let l = default_logger(); + let s = new_storage(); + s.wl() + .apply_snapshot(new_snapshot(5, 5, vec![1, 2, 3])) + .unwrap(); + + let mut raw_node = new_raw_node(1, vec![1, 2, 3], 10, 1, s.clone(), &l); + for _ in 1..raw_node.raft.election_timeout() * 2 { + raw_node.raft.tick_election(); + } + let rd = raw_node.ready(); + assert_eq!(rd.number(), 1); + must_cmp_ready( + &rd, + &Some(soft_state(0, StateRole::Candidate)), + &Some(hard_state(6, 5, 1)), + &[], + &[], + &None, + true, + true, + ); + s.wl().set_hardstate(rd.hs().unwrap().clone()); + + let mut light_rd = raw_node.advance_append(rd); + for vec_msg in light_rd.take_messages() { + for msg in vec_msg { + assert_eq!(msg.get_msg_type(), MessageType::MsgRequestVote); + } + } + + // Peer 1 should reject to vote to peer 2 + let mut vote_request_2 = new_message(2, 1, MessageType::MsgRequestVote, 0); + vote_request_2.set_term(6); + vote_request_2.set_log_term(4); + vote_request_2.set_index(4); + raw_node.step(vote_request_2).unwrap(); + + let rd = raw_node.ready(); + assert_eq!(rd.number(), 2); + must_cmp_ready(&rd, &None, &None, &[], &[], &None, true, false); + raw_node.advance_append_async(rd); + + // Peer 1 should reject to vote to peer 2 + let mut vote_request_3 = new_message(3, 1, MessageType::MsgRequestVote, 0); + vote_request_3.set_term(6); + vote_request_3.set_log_term(4); + vote_request_3.set_index(4); + raw_node.step(vote_request_3).unwrap(); + + let rd = raw_node.ready(); + assert_eq!(rd.number(), 3); + must_cmp_ready(&rd, &None, &None, &[], &[], &None, true, false); + raw_node.advance_append_async(rd); + + // Peer 1 receives the vote from peer 2 + let mut vote_response_2 = new_message(2, 1, MessageType::MsgRequestVoteResponse, 0); + vote_response_2.set_term(6); + vote_response_2.set_reject(false); + raw_node.step(vote_response_2).unwrap(); + + let mut rd = raw_node.ready(); + assert_eq!(rd.number(), 4); + assert_eq!(rd.entries().len(), 1); + must_cmp_ready( + &rd, + &Some(soft_state(1, StateRole::Leader)), + &None, + rd.entries(), + &[], + &None, + false, + true, + ); + // 2 vote reject + 2 append entries + let mut count = 0; + for vec_msg in rd.take_messages() { + for msg in vec_msg { + let msg_type = match count { + 0 | 1 => MessageType::MsgRequestVoteResponse, + _ => MessageType::MsgAppend, + }; + assert_eq!(msg.get_msg_type(), msg_type); + count += 1; + } + } + assert_eq!(count, 4); + + let light_rd = raw_node.advance_append(rd); + assert_eq!(light_rd.commit_index(), None); + assert!(light_rd.committed_entries().is_empty()); + assert!(light_rd.messages().is_empty()); +} diff --git a/src/lib.rs b/src/lib.rs index aa96c075c..12e3ee9a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -205,12 +205,12 @@ let mut ready = node.ready(); The `Ready` state contains quite a bit of information, and you need to check and process them one by one: -1. Check whether `snapshot` is empty or not. If not empty, it means that the Raft node has received -a Raft snapshot from the leader and we must apply the snapshot: +1. Check whether `messages` is empty or not. If not, it means that the node will send messages to +other nodes: ```rust # use slog::{Drain, o}; - # use raft::{Config, storage::MemStorage, raw_node::RawNode}; + # use raft::{Config, storage::MemStorage, raw_node::RawNode, StateRole}; # # let config = Config { id: 1, ..Default::default() }; # config.validate().unwrap(); @@ -222,19 +222,18 @@ a Raft snapshot from the leader and we must apply the snapshot: # return; # } # let mut ready = node.ready(); + # let is_leader = node.raft.state == StateRole::Leader; # - if !ready.snapshot().is_empty() { - // This is a snapshot, we need to apply the snapshot at first. - node.mut_store() - .wl() - .apply_snapshot(ready.snapshot().clone()) - .unwrap(); + let msgs = ready.take_messages(); + for vec_msg in msgs { + for _msg in vec_msg { + // Send messages to other peers. + } } - ``` -2. Check whether `entries` is empty or not. If not empty, it means that there are newly added -entries but has not been committed yet, we must append the entries to the Raft log: +2. Check whether `snapshot` is empty or not. If not empty, it means that the Raft node has received +a Raft snapshot from the leader and we must apply the snapshot: ```rust # use slog::{Drain, o}; @@ -251,20 +250,23 @@ entries but has not been committed yet, we must append the entries to the Raft l # } # let mut ready = node.ready(); # - if !ready.entries().is_empty() { - // Append entries to the Raft log - node.mut_store().wl().append(ready.entries()).unwrap(); + if !ready.snapshot().is_empty() { + // This is a snapshot, we need to apply the snapshot at first. + node.mut_store() + .wl() + .apply_snapshot(ready.snapshot().clone()) + .unwrap(); } ``` -3. Check whether `hs` is empty or not. If not empty, it means that the `HardState` of the node has -changed. For example, the node may vote for a new leader, or the commit index has been increased. -We must persist the changed `HardState`: +3. Check whether `committed_entires` is empty or not. If not, it means that there are some newly +committed log entries which you must apply to the state machine. Of course, after applying, you +need to update the applied index and resume `apply` later: ```rust # use slog::{Drain, o}; - # use raft::{Config, storage::MemStorage, raw_node::RawNode}; + # use raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::EntryType}; # # let config = Config { id: 1, ..Default::default() }; # config.validate().unwrap(); @@ -277,20 +279,37 @@ We must persist the changed `HardState`: # } # let mut ready = node.ready(); # - if let Some(hs) = ready.hs() { - // Raft HardState changed, and we need to persist it. - node.mut_store().wl().set_hardstate(hs.clone()); + # fn handle_conf_change(e: raft::eraftpb::Entry) { + # } + # + # fn handle_normal(e: raft::eraftpb::Entry) { + # } + # + let mut _last_apply_index = 0; + for entry in ready.take_committed_entries() { + // Mostly, you need to save the last apply index to resume applying + // after restart. Here we just ignore this because we use a Memory storage. + _last_apply_index = entry.index; + + if entry.data.is_empty() { + // Emtpy entry, when the peer becomes Leader it will send an empty entry. + continue; + } + + match entry.get_entry_type() { + EntryType::EntryNormal => handle_normal(entry), + EntryType::EntryConfChange => handle_conf_change(entry), + EntryType::EntryConfChangeV2 => unimplemented!(), + } } ``` -4. Check whether `messages` is empty or not. If not, it means that the node will send messages to -other nodes. There has been an optimization for sending messages: if the node is a leader, this can -be done together with step 1 in parallel; if the node is not a leader, it needs to reply the -messages to the leader after appending the Raft entries: +4. Check whether `entries` is empty or not. If not empty, it means that there are newly added +entries but have not been committed yet, we must append the entries to the Raft log: ```rust # use slog::{Drain, o}; - # use raft::{Config, storage::MemStorage, raw_node::RawNode, StateRole}; + # use raft::{Config, storage::MemStorage, raw_node::RawNode}; # # let config = Config { id: 1, ..Default::default() }; # config.validate().unwrap(); @@ -302,25 +321,21 @@ messages to the leader after appending the Raft entries: # return; # } # let mut ready = node.ready(); - # let is_leader = node.raft.state == StateRole::Leader; # - if !is_leader { - // If not leader, the follower needs to reply the messages to - // the leader after appending Raft entries. - let msgs = ready.messages.drain(..); - for _msg in msgs { - // Send messages to other peers. - } + if !ready.entries().is_empty() { + // Append entries to the Raft log + node.mut_store().wl().append(ready.entries()).unwrap(); } + ``` -5. Check whether `committed_entires` is empty or not. If not, it means that there are some newly -committed log entries which you must apply to the state machine. Of course, after applying, you -need to update the applied index and resume `apply` later: +5. Check whether `hs` is empty or not. If not empty, it means that the `HardState` of the node has +changed. For example, the node may vote for a new leader, or the commit index has been increased. +We must persist the changed `HardState`: ```rust # use slog::{Drain, o}; - # use raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::EntryType}; + # use raft::{Config, storage::MemStorage, raw_node::RawNode}; # # let config = Config { id: 1, ..Default::default() }; # config.validate().unwrap(); @@ -333,38 +348,20 @@ need to update the applied index and resume `apply` later: # } # let mut ready = node.ready(); # - # fn handle_conf_change(e: raft::eraftpb::Entry) { - # } - # - # fn handle_normal(e: raft::eraftpb::Entry) { - # } - # - if let Some(committed_entries) = ready.committed_entries.take() { - let mut _last_apply_index = 0; - for entry in committed_entries { - // Mostly, you need to save the last apply index to resume applying - // after restart. Here we just ignore this because we use a Memory storage. - _last_apply_index = entry.index; - - if entry.data.is_empty() { - // Emtpy entry, when the peer becomes Leader it will send an empty entry. - continue; - } - - match entry.get_entry_type() { - EntryType::EntryNormal => handle_normal(entry), - EntryType::EntryConfChange => handle_conf_change(entry), - EntryType::EntryConfChangeV2 => unimplemented!(), - } - } + if let Some(hs) = ready.hs() { + // Raft HardState changed, and we need to persist it. + node.mut_store().wl().set_hardstate(hs.clone()); } ``` -6. Call `advance` to prepare for the next `Ready` state. +6. Call `advance` to notify that the previous work is completed. Get the return value `LightReady` +and handle its `messages` and `committed_entries` like step 1 and step 3 does. Then call `advance_apply` +to advance the applied index inside. ```rust # use slog::{Drain, o}; - # use raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::EntryType}; + # use raft::{Config, storage::MemStorage, raw_node::RawNode}; + # use raft::eraftpb::{EntryType, Entry, Message}; # # let config = Config { id: 1, ..Default::default() }; # config.validate().unwrap(); @@ -377,7 +374,16 @@ need to update the applied index and resume `apply` later: # } # let mut ready = node.ready(); # - node.advance(ready); + # fn handle_messages(msgs: Vec>) { + # } + # + # fn handle_committed_entries(committed_entries: Vec) { + # } + let mut light_rd = node.advance(ready); + // Like step 1 and 5, you can use functions to make them behave the same. + handle_messages(light_rd.take_messages()); + handle_committed_entries(light_rd.take_committed_entries()); + node.advance_apply(); ``` For more information, check out an [example](examples/single_mem_node/main.rs#L113-L179). @@ -499,7 +505,7 @@ pub use self::tracker::{Inflights, Progress, ProgressState, ProgressTracker}; #[allow(deprecated)] pub use self::raw_node::is_empty_snap; -pub use self::raw_node::{Peer, RawNode, Ready, SnapshotStatus}; +pub use self::raw_node::{LightReady, Peer, RawNode, Ready, SnapshotStatus}; pub use self::read_only::{ReadOnlyOption, ReadState}; pub use self::status::Status; pub use self::storage::{RaftState, Storage}; diff --git a/src/log_unstable.rs b/src/log_unstable.rs index 829a3882d..21ea5e3b7 100644 --- a/src/log_unstable.rs +++ b/src/log_unstable.rs @@ -85,29 +85,18 @@ impl Unstable { } } - /// Moves the stable offset up to the index. Provided that the index - /// is in the same election term. - pub fn stable_to(&mut self, idx: u64, term: u64) { - let t = self.maybe_term(idx); - if t.is_none() { - return; - } - - if t.unwrap() == term && idx >= self.offset { - let start = idx + 1 - self.offset; - self.entries.drain(..start as usize); - self.offset = idx + 1; + /// Clears the unstable entries and moves the stable offset up to the + /// last index, if there is any. + pub fn stable_entries(&mut self) { + if let Some(entry) = self.entries.last() { + self.offset = entry.get_index() + 1; + self.entries.clear(); } } - /// Removes the snapshot from self if the index of the snapshot matches - pub fn stable_snap_to(&mut self, idx: u64) { - if self.snapshot.is_none() { - return; - } - if idx == self.snapshot.as_ref().unwrap().get_metadata().index { - self.snapshot = None; - } + /// Clears the unstable snapshot. + pub fn stable_snap(&mut self) { + self.snapshot = None; } /// From a given snapshot, restores the snapshot to self, but doesn't unpack. @@ -330,82 +319,18 @@ mod test { } #[test] - fn test_stable_to() { - // entries, offset, snap, index, term, woffset, wlen - let tests = vec![ - (vec![], 0, None, 5, 1, 0, 0), - // stable to the first entry - (vec![new_entry(5, 1)], 5, None, 5, 1, 6, 0), - (vec![new_entry(5, 1), new_entry(6, 0)], 5, None, 5, 1, 6, 1), - // stable to the first entry and term mismatch - (vec![new_entry(6, 2)], 5, None, 6, 1, 5, 1), - // stable to old entry - (vec![new_entry(5, 1)], 5, None, 4, 1, 5, 1), - (vec![new_entry(5, 1)], 5, None, 4, 2, 5, 1), - // with snapshot - // stable to the first entry - ( - vec![new_entry(5, 1)], - 5, - Some(new_snapshot(4, 1)), - 5, - 1, - 6, - 0, - ), - // stable to the first entry - ( - vec![new_entry(5, 1), new_entry(6, 1)], - 5, - Some(new_snapshot(4, 1)), - 5, - 1, - 6, - 1, - ), - // stable to the first entry and term mismatch - ( - vec![new_entry(6, 2)], - 5, - Some(new_snapshot(5, 1)), - 6, - 1, - 5, - 1, - ), - // stable to snapshot - ( - vec![new_entry(5, 1)], - 5, - Some(new_snapshot(4, 1)), - 4, - 1, - 5, - 1, - ), - // stable to old entry - ( - vec![new_entry(5, 2)], - 5, - Some(new_snapshot(4, 2)), - 4, - 1, - 5, - 1, - ), - ]; - - for (entries, offset, snapshot, index, term, woffset, wlen) in tests { - let mut u = Unstable { - entries, - offset, - snapshot, - logger: crate::default_logger(), - }; - u.stable_to(index, term); - assert_eq!(u.offset, woffset); - assert_eq!(u.entries.len(), wlen); - } + fn test_stable_entries() { + let ents = vec![new_entry(5, 1), new_entry(5, 2), new_entry(6, 3)]; + let mut u = Unstable { + entries: ents.clone(), + offset: 5, + snapshot: Some(new_snapshot(4, 1)), + logger: crate::default_logger(), + }; + assert_eq!(ents, u.entries); + u.stable_entries(); + assert!(u.entries.is_empty()); + assert_eq!(u.offset, 7); } #[test] diff --git a/src/raft.rs b/src/raft.rs index 40fd5afcd..843e90bbe 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -940,11 +940,12 @@ impl Raft { let last_index = self.raft_log.last_index(); let committed = self.raft_log.committed; + let persisted = self.raft_log.persisted; let self_id = self.id; for (&id, mut pr) in self.mut_prs().iter_mut() { pr.reset(last_index + 1); if id == self_id { - pr.matched = last_index; + pr.matched = persisted; pr.committed_index = committed; } } @@ -959,23 +960,30 @@ impl Raft { return false; } - let mut li = self.raft_log.last_index(); + let li = self.raft_log.last_index(); for (i, e) in es.iter_mut().enumerate() { e.term = self.term; e.index = li + 1 + i as u64; } - // use latest "last" index after truncate/append - li = self.raft_log.append(es); - - let self_id = self.id; - self.mut_prs().get_mut(self_id).unwrap().maybe_update(li); - - // Regardless of maybe_commit's return, our caller will call bcastAppend. - self.maybe_commit(); + self.raft_log.append(es); + // Not update self's pr.matched until on_persist_entries true } + /// Notifies that these raft logs have been well persisted. + pub fn on_persist_entries(&mut self, index: u64, term: u64) { + let update = self.raft_log.maybe_persist(index, term); + if update && self.state == StateRole::Leader { + let self_id = self.id; + let pr = self.mut_prs().get_mut(self_id).unwrap(); + pr.maybe_update(index); + if self.maybe_commit() && self.should_bcast_commit() { + self.bcast_append(); + } + } + } + /// Returns true to indicate that there will probably be some readiness need to be handled. pub fn tick(&mut self) -> bool { match self.state { @@ -1116,9 +1124,18 @@ impl Raft { self.leader_id = self.id; self.state = StateRole::Leader; - // update uncommitted state + let last_index = self.raft_log.last_index(); + // If there is only one peer, it becomes leader after starting + // so all logs must be persisted. + // If not, it becomes leader after sending RequestVote msg. + // Since all logs must be persisted before sending RequestVote + // msg and logs can not be changed when it's (pre)candidate, the + // last index is equal to persisted index when it becomes leader. + assert_eq!(last_index, self.raft_log.persisted); + + // Update uncommitted state self.uncommitted_state.uncommitted_size = 0; - self.uncommitted_state.last_log_tail_index = self.raft_log.last_index(); + self.uncommitted_state.last_log_tail_index = last_index; // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is @@ -1132,9 +1149,9 @@ impl Raft { // safe to delay any future proposals until we commit all our // pending log entries, and scanning the entire tail of the log // could be expensive. - self.pending_conf_index = self.raft_log.last_index(); + self.pending_conf_index = last_index; - // no need to check result becase append_entry never refuse entries + // No need to check result becase append_entry never refuse entries // which size is zero if !self.append_entry(&mut [Entry::default()]) { panic!("appending an empty entry should never be dropped") diff --git a/src/raft_log.rs b/src/raft_log.rs index 4b382e2e9..2acdbdfb2 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -37,12 +37,21 @@ pub struct RaftLog { /// The highest log position that is known to be in stable storage /// on a quorum of nodes. + /// + /// Invariant: applied <= committed pub committed: u64, + /// The highest log position that is known to be persisted in stable + /// storage. It's used for limiting the upper bound of committed and + /// persisted entries. + /// + /// Invariant: persisted < unstable.offset && applied <= persisted + pub persisted: u64, + /// The highest log position that the application has been instructed /// to apply to its state machine. /// - /// Invariant: applied <= committed + /// Invariant: applied <= min(committed, persisted) pub applied: u64, } @@ -52,8 +61,9 @@ where { fn to_string(&self) -> String { format!( - "committed={}, applied={}, unstable.offset={}, unstable.entries.len()={}", + "committed={}, persisted={}, applied={}, unstable.offset={}, unstable.entries.len()={}", self.committed, + self.persisted, self.applied, self.unstable.offset, self.unstable.entries.len() @@ -71,6 +81,7 @@ impl RaftLog { RaftLog { store, committed: first_index - 1, + persisted: last_index, applied: first_index - 1, unstable: Unstable::new(last_index + 1, logger), } @@ -215,6 +226,10 @@ impl RaftLog { } else { let start = (conflict_idx - (idx + 1)) as usize; self.append(&ents[start..]); + // persisted should be decreased because entries are changed + if self.persisted > conflict_idx - 1 { + self.persisted = conflict_idx - 1; + } } let last_new_index = idx + ents.len() as u64; self.commit_to(cmp::min(committed, last_new_index)); @@ -255,13 +270,14 @@ impl RaftLog { if idx == 0 { return; } - if self.committed < idx || idx < self.applied { + if idx > cmp::min(self.committed, self.persisted) || idx < self.applied { fatal!( self.unstable.logger, - "applied({}) is out of range [prev_applied({}), committed({})", + "applied({}) is out of range [prev_applied({}), min(committed({}), persisted({}))]", idx, self.applied, - self.committed + self.committed, + self.persisted, ) } self.applied = idx; @@ -272,14 +288,15 @@ impl RaftLog { self.applied } - /// Attempts to set the stable up to a given index. - pub fn stable_to(&mut self, idx: u64, term: u64) { - self.unstable.stable_to(idx, term) + /// Clears the unstable entries and moves the stable offset up to the + /// last index, if there is any. + pub fn stable_entries(&mut self) { + self.unstable.stable_entries(); } - /// Snaps the unstable up to a current index. - pub fn stable_snap_to(&mut self, idx: u64) { - self.unstable.stable_snap_to(idx) + /// Clears the unstable snapshot. + pub fn stable_snap(&mut self) { + self.unstable.stable_snap(); } /// Returns a reference to the unstable log. @@ -287,6 +304,16 @@ impl RaftLog { &self.unstable } + /// Returns slice of entries that are not persisted. + pub fn unstable_entries(&self) -> &[Entry] { + &self.unstable.entries + } + + /// Returns the snapshot that are not persisted. + pub fn unstable_snapshot(&self) -> &Option { + &self.unstable.snapshot + } + /// Appends a set of entries to the unstable list. pub fn append(&mut self, ents: &[Entry]) -> u64 { trace!( @@ -311,14 +338,6 @@ impl RaftLog { self.last_index() } - /// Returns slice of entries that are not committed. - pub fn unstable_entries(&self) -> Option<&[Entry]> { - if self.unstable.entries.is_empty() { - return None; - } - Some(&self.unstable.entries) - } - /// Returns entries starting from a particular index and not exceeding a bytesize. pub fn entries(&self, idx: u64, max_size: impl Into>) -> Result> { let max_size = max_size.into(); @@ -354,12 +373,12 @@ impl RaftLog { term > self.last_term() || (term == self.last_term() && last_index >= self.last_index()) } - /// Returns any entries since the a particular index. + /// Returns committed and persisted entries since max(`since_idx` + 1, first_index). pub fn next_entries_since(&self, since_idx: u64) -> Option> { let offset = cmp::max(since_idx + 1, self.first_index()); - let committed = self.committed; - if committed + 1 > offset { - match self.slice(offset, committed + 1, None) { + let high = cmp::min(self.committed, self.persisted) + 1; + if high > offset { + match self.slice(offset, high, None) { Ok(vec) => return Some(vec), Err(e) => fatal!(self.unstable.logger, "{}", e), } @@ -374,10 +393,12 @@ impl RaftLog { self.next_entries_since(self.applied) } - /// Returns whether there are entries that can be applied between `since_idx` and the comitted index. + /// Returns whether there are committed and persisted entries since + /// max(`since_idx` + 1, first_index). pub fn has_next_entries_since(&self, since_idx: u64) -> bool { let offset = cmp::max(since_idx + 1, self.first_index()); - self.committed + 1 > offset + let high = cmp::min(self.committed, self.persisted) + 1; + high > offset } /// Returns whether there are new entries. @@ -437,6 +458,25 @@ impl RaftLog { } } + /// Attempts to persist the index and term and returns whether it did. + pub fn maybe_persist(&mut self, index: u64, term: u64) -> bool { + // It's possible that the term check can be passed but index is greater + // than or equal to the unstable's offset in some corner cases. + // We handle these issues by not forwarding the persisted index. It's + // pretty intuitive because the offset means there are some entries whose + // index is greater than or equal to the offset has not been persisted yet. + if index > self.persisted + && index < self.unstable.offset + && self.term(index).map_or(false, |t| t == term) + { + debug!(self.unstable.logger, "persisted index {}", index); + self.persisted = index; + true + } else { + false + } + } + /// Grabs a slice of entries from the raft. Unlike a rust slice pointer, these are /// returned by value. The result is truncated to the max_size in bytes. pub fn slice( @@ -495,14 +535,23 @@ impl RaftLog { snapshot_index = snapshot.get_metadata().index, snapshot_term = snapshot.get_metadata().term, ); - self.committed = snapshot.get_metadata().index; + let index = snapshot.get_metadata().index; + assert!(index >= self.committed, "{} < {}", index, self.committed); + self.committed = index; + // persisted is just used for fetching committed entries. + // Here reset the persisted to index to satisfy its invariant which is + // persisted < unstable.offset and applied <= persisted. + self.persisted = index; self.unstable.restore(snapshot); } } #[cfg(test)] mod test { - use std::panic::{self, AssertUnwindSafe}; + use std::{ + cmp, + panic::{self, AssertUnwindSafe}, + }; use crate::default_logger; use crate::eraftpb; @@ -672,14 +721,11 @@ mod test { for i in unstable_index..last_index { raft_log.append(&[new_entry(i as u64 + 1, i as u64 + 1)]); } - assert!( raft_log.maybe_commit(last_index, last_term), "maybe_commit return false" ); - let committed = raft_log.committed; - #[allow(deprecated)] - raft_log.applied_to(committed); + let offset = 500u64; raft_log.store.wl().compact(offset).expect("compact failed"); @@ -693,9 +739,9 @@ mod test { } { - let unstable_ents = raft_log.unstable_entries().expect("should have content."); - assert_eq!(250, unstable_ents.len()); - assert_eq!(751, unstable_ents[0].index); + let unstable_ents = raft_log.unstable_entries(); + assert_eq!(last_index - unstable_index, unstable_ents.len() as u64); + assert_eq!(unstable_index + 1, unstable_ents[0].index); } let mut prev = raft_log.last_index(); @@ -718,6 +764,8 @@ mod test { .expect("apply failed."); let mut raft_log = RaftLog::new(store, default_logger()); raft_log.restore(new_snapshot(unstablesnapi, 1)); + assert_eq!(raft_log.committed, unstablesnapi); + assert_eq!(raft_log.persisted, unstablesnapi); let tests = vec![ // cannot get term from storage @@ -776,99 +824,104 @@ mod test { .wl() .apply_snapshot(new_snapshot(index, term)) .expect("apply failed."); + let entries = vec![new_entry(index + 1, term), new_entry(index + 2, term + 1)]; + store.wl().append(&entries).expect(""); let raft_log = RaftLog::new(store, default_logger()); - assert!(raft_log.all_entries().is_empty()); + assert_eq!(raft_log.all_entries(), entries); assert_eq!(index + 1, raft_log.first_index()); assert_eq!(index, raft_log.committed); - assert_eq!(index + 1, raft_log.unstable.offset); - let actual_term = raft_log.term(index).expect(""); - assert_eq!(term, actual_term); + assert_eq!(index + 2, raft_log.persisted); + assert_eq!(index + 3, raft_log.unstable.offset); + + assert_eq!(term, raft_log.term(index).unwrap()); + assert_eq!(term, raft_log.term(index + 1).unwrap()); + assert_eq!(term + 1, raft_log.term(index + 2).unwrap()); } #[test] - fn test_stable_to_with_snap() { + fn test_maybe_persist_with_snap() { let l = default_logger(); let (snap_index, snap_term) = (5u64, 2u64); + // persisted_index, persisted_term, new_entries, wpersisted let tests = vec![ - (snap_index + 1, snap_term, vec![], snap_index + 1), - (snap_index, snap_term, vec![], snap_index + 1), - (snap_index - 1, snap_term, vec![], snap_index + 1), - (snap_index + 1, snap_term + 1, vec![], snap_index + 1), - (snap_index, snap_term + 1, vec![], snap_index + 1), - (snap_index - 1, snap_term + 1, vec![], snap_index + 1), + (snap_index + 1, snap_term, vec![], snap_index), + (snap_index, snap_term, vec![], snap_index), + (snap_index - 1, snap_term, vec![], snap_index), + (snap_index + 1, snap_term + 1, vec![], snap_index), + (snap_index, snap_term + 1, vec![], snap_index), + (snap_index - 1, snap_term + 1, vec![], snap_index), ( snap_index + 1, snap_term, vec![new_entry(snap_index + 1, snap_term)], - snap_index + 2, + snap_index + 1, ), ( snap_index, snap_term, vec![new_entry(snap_index + 1, snap_term)], - snap_index + 1, + snap_index, ), ( snap_index - 1, snap_term, vec![new_entry(snap_index + 1, snap_term)], - snap_index + 1, + snap_index, ), ( snap_index + 1, snap_term + 1, vec![new_entry(snap_index + 1, snap_term)], - snap_index + 1, + snap_index, ), ( snap_index, snap_term + 1, vec![new_entry(snap_index + 1, snap_term)], - snap_index + 1, + snap_index, ), ( snap_index - 1, snap_term + 1, vec![new_entry(snap_index + 1, snap_term)], - snap_index + 1, + snap_index, ), ]; - for (i, &(stablei, stablet, ref new_ents, wunstable)) in tests.iter().enumerate() { + for (i, &(stablei, stablet, ref new_ents, wpersist)) in tests.iter().enumerate() { let store = MemStorage::new(); store .wl() .apply_snapshot(new_snapshot(snap_index, snap_term)) .expect(""); let mut raft_log = RaftLog::new(store, l.clone()); + assert_eq!(raft_log.persisted, snap_index); raft_log.append(new_ents); - raft_log.stable_to(stablei, stablet); - if raft_log.unstable.offset != wunstable { + let unstable = raft_log.unstable_entries().to_vec(); + raft_log.stable_entries(); + raft_log.mut_store().wl().append(&unstable).expect(""); + let is_changed = raft_log.persisted != wpersist; + assert_eq!(raft_log.maybe_persist(stablei, stablet), is_changed); + if raft_log.persisted != wpersist { panic!( - "#{}: unstable = {}, want {}", - i, raft_log.unstable.offset, wunstable + "#{}: persisted = {}, want {}", + i, raft_log.persisted, wpersist ); } } - } - #[test] - fn test_stable_to() { - let l = default_logger(); - let tests = vec![(1, 1, 2), (2, 2, 3), (2, 1, 1), (3, 1, 1)]; - for (i, &(stablei, stablet, wunstable)) in tests.iter().enumerate() { - let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); - raft_log.append(&[new_entry(1, 1), new_entry(2, 2)]); - raft_log.stable_to(stablei, stablet); - if raft_log.unstable.offset != wunstable { - panic!( - "#{}: unstable = {}, want {}", - i, raft_log.unstable.offset, wunstable - ); - } - } + let mut raft_log = RaftLog::new(MemStorage::new(), default_logger()); + raft_log.restore(new_snapshot(100, 1)); + assert_eq!(raft_log.unstable.offset, 101); + raft_log.append(&[new_entry(101, 1)]); + assert_eq!(raft_log.term(101), Ok(1)); + // 101 == offset, should not forward persisted + assert!(!raft_log.maybe_persist(101, 1)); + raft_log.append(&[new_entry(102, 1)]); + assert_eq!(raft_log.term(102), Ok(1)); + // 102 > offset, should not forward persisted + assert!(!raft_log.maybe_persist(102, 1)); } // TestUnstableEnts ensures unstableEntries returns the unstable part of the @@ -891,11 +944,8 @@ mod test { let mut raft_log = RaftLog::new(store, l.clone()); raft_log.append(&previous_ents[(unstable - 1)..]); - let ents = raft_log.unstable_entries().unwrap_or(&[]).to_vec(); - let l = ents.len(); - if l > 0 { - raft_log.stable_to(ents[l - 1].index, ents[l - i].term); - } + let ents = raft_log.unstable_entries().to_vec(); + raft_log.stable_entries(); if &ents != wents { panic!("#{}: unstableEnts = {:?}, want {:?}", i, ents, wents); } @@ -908,24 +958,67 @@ mod test { } #[test] - fn test_next_ents() { + fn test_has_next_ents_and_next_ents() { let l = default_logger(); - let ents = [new_entry(4, 1), new_entry(5, 1), new_entry(6, 1)]; + let ents = [ + new_entry(4, 1), + new_entry(5, 1), + new_entry(6, 1), + new_entry(7, 1), + ]; + // applied, persisted, committed, expect_entries let tests = vec![ - (0, Some(&ents[..2])), - (3, Some(&ents[..2])), - (4, Some(&ents[1..2])), - (5, None), + (0, 3, 3, None), + (0, 3, 4, None), + (0, 4, 6, Some(&ents[..1])), + (0, 6, 4, Some(&ents[..1])), + (0, 5, 5, Some(&ents[..2])), + (0, 5, 7, Some(&ents[..2])), + (0, 7, 5, Some(&ents[..2])), + (3, 4, 3, None), + (3, 5, 5, Some(&ents[..2])), + (3, 6, 7, Some(&ents[..3])), + (3, 7, 6, Some(&ents[..3])), + (4, 5, 5, Some(&ents[1..2])), + (4, 5, 7, Some(&ents[1..2])), + (4, 7, 5, Some(&ents[1..2])), + (4, 7, 7, Some(&ents[1..4])), + (5, 5, 5, None), + (5, 7, 7, Some(&ents[2..4])), + (7, 7, 7, None), ]; - for (i, &(applied, ref expect_entries)) in tests.iter().enumerate() { + for (i, &(applied, persisted, committed, ref expect_entries)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); let mut raft_log = RaftLog::new(store, l.clone()); raft_log.append(&ents); - raft_log.maybe_commit(5, 1); + let unstable = raft_log.unstable_entries().to_vec(); + raft_log.stable_entries(); + raft_log.mut_store().wl().append(&unstable).expect(""); + raft_log.maybe_persist(persisted, 1); + assert_eq!( + persisted, raft_log.persisted, + "#{}: persisted = {}, want {}", + i, raft_log.persisted, persisted + ); + raft_log.maybe_commit(committed, 1); + assert_eq!( + committed, raft_log.committed, + "#{}: committed = {}, want {}", + i, raft_log.committed, committed + ); #[allow(deprecated)] raft_log.applied_to(applied); + let expect_has_next = expect_entries.is_some(); + let actual_has_next = raft_log.has_next_entries(); + if actual_has_next != expect_has_next { + panic!( + "#{}: hasNext = {}, want {}", + i, actual_has_next, expect_has_next + ); + } + let next_entries = raft_log.next_entries(); if next_entries != expect_entries.map(|n| n.to_vec()) { panic!( @@ -936,28 +1029,6 @@ mod test { } } - #[test] - fn test_has_next_ents() { - let l = default_logger(); - let ents = [new_entry(4, 1), new_entry(5, 1), new_entry(6, 1)]; - let tests = vec![(0, true), (3, true), (4, true), (5, false)]; - - for (i, &(applied, has_next)) in tests.iter().enumerate() { - let store = MemStorage::new(); - store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); - raft_log.append(&ents); - raft_log.maybe_commit(5, 1); - #[allow(deprecated)] - raft_log.applied_to(applied); - - let actual_has_next = raft_log.has_next_entries(); - if actual_has_next != has_next { - panic!("#{}: hasNext = {}, want {}", i, actual_has_next, has_next); - } - } - } - #[test] fn test_slice() { let (offset, num) = (100u64, 100u64); @@ -1093,15 +1164,15 @@ mod test { /// If the given (index, term) matches with the existing log: /// 1. If an existing entry conflicts with a new one (same index /// but different terms), delete the existing entry and all that - /// follow it - /// 2.Append any new entries not already in the log + /// follow it and decrease the persisted + /// 2. Append any new entries not already in the log /// If the given (index, term) does not match with the existing log: /// return false #[test] fn test_log_maybe_append() { let l = default_logger(); let previous_ents = vec![new_entry(1, 1), new_entry(2, 2), new_entry(3, 3)]; - let (last_index, last_term, commit) = (3u64, 3u64, 1u64); + let (last_index, last_term, commit, persist) = (3u64, 3u64, 1u64, 3u64); let tests = vec![ // not match: term is different @@ -1112,6 +1183,7 @@ mod test { vec![new_entry(last_index + 1, 4)], None, commit, + persist, false, ), // not match: index out of bound @@ -1122,6 +1194,7 @@ mod test { vec![new_entry(last_index + 2, 4)], None, commit, + persist, false, ), // match with the last existing entry @@ -1132,8 +1205,10 @@ mod test { vec![], Some(last_index), last_index, + persist, false, ), + // do not increase commit higher than lastnewi ( last_term, last_index, @@ -1141,8 +1216,10 @@ mod test { vec![], Some(last_index), last_index, + persist, false, - ), // do not increase commit higher than lastnewi + ), + // commit up to the commit in the message ( last_term, last_index, @@ -1150,8 +1227,10 @@ mod test { vec![], Some(last_index), last_index - 1, + persist, false, - ), // commit up to the commit in the message + ), + // commit do not decrease ( last_term, last_index, @@ -1159,9 +1238,11 @@ mod test { vec![], Some(last_index), commit, + persist, false, - ), // commit do not decrease - (0, 0, last_index, vec![], Some(0), commit, false), // commit do not decrease + ), + // commit do not decrease + (0, 0, last_index, vec![], Some(0), commit, persist, false), ( last_term, last_index, @@ -1169,6 +1250,7 @@ mod test { vec![new_entry(last_index + 1, 4)], Some(last_index + 1), last_index, + persist, false, ), ( @@ -1178,8 +1260,10 @@ mod test { vec![new_entry(last_index + 1, 4)], Some(last_index + 1), last_index + 1, + persist, false, ), + // do not increase commit higher than lastnewi ( last_term, last_index, @@ -1187,9 +1271,9 @@ mod test { vec![new_entry(last_index + 1, 4)], Some(last_index + 1), last_index + 1, + persist, false, - ), // do not increase commit higher than - // lastnewi + ), ( last_term, last_index, @@ -1197,6 +1281,7 @@ mod test { vec![new_entry(last_index + 1, 4), new_entry(last_index + 2, 4)], Some(last_index + 2), last_index + 2, + persist, false, ), // match with the the entry in the middle @@ -1207,6 +1292,7 @@ mod test { vec![new_entry(last_index, 4)], Some(last_index), last_index, + cmp::min(last_index - 1, persist), false, ), ( @@ -1216,8 +1302,10 @@ mod test { vec![new_entry(last_index - 1, 4)], Some(last_index - 1), last_index - 1, + cmp::min(last_index - 2, persist), false, ), + // conflict with existing committed entry ( last_term - 3, last_index - 3, @@ -1225,8 +1313,9 @@ mod test { vec![new_entry(last_index - 2, 4)], Some(last_index - 2), last_index - 2, + cmp::min(last_index - 3, persist), true, - ), // conflict with existing committed entry + ), ( last_term - 2, last_index - 2, @@ -1234,17 +1323,33 @@ mod test { vec![new_entry(last_index - 1, 4), new_entry(last_index, 4)], Some(last_index), last_index, + cmp::min(last_index - 2, persist), + false, + ), + ( + last_term - 2, + last_index - 2, + last_index + 2, + vec![ + new_entry(last_index - 1, last_term - 1), + new_entry(last_index, 4), + new_entry(last_index + 1, 4), + ], + Some(last_index + 1), + last_index + 1, + cmp::min(last_index - 1, persist), false, ), ]; - for (i, &(log_term, index, committed, ref ents, wlasti, wcommit, wpanic)) in + for (i, &(log_term, index, committed, ref ents, wlasti, wcommit, wpersist, wpanic)) in tests.iter().enumerate() { let store = MemStorage::new(); let mut raft_log = RaftLog::new(store, l.clone()); raft_log.append(&previous_ents); raft_log.committed = commit; + raft_log.persisted = persist; let res = panic::catch_unwind(AssertUnwindSafe(|| { raft_log .maybe_append(index, log_term, committed, ents) @@ -1258,12 +1363,16 @@ mod test { } let glasti = res.unwrap(); let gcommitted = raft_log.committed; + let gpersisted = raft_log.persisted; if glasti != wlasti { panic!("#{}: lastindex = {:?}, want {:?}", i, glasti, wlasti); } if gcommitted != wcommit { panic!("#{}: committed = {}, want {}", i, gcommitted, wcommit); } + if gpersisted != wpersist { + panic!("#{}: persisted = {}, want {}", i, gpersisted, wpersist); + } let ents_len = ents.len() as u64; if glasti.is_some() && ents_len != 0 { let (from, to) = ( @@ -1402,4 +1511,43 @@ mod test { } } } + + #[test] + fn test_restore_snap() { + let store = MemStorage::new(); + store.wl().apply_snapshot(new_snapshot(100, 1)).expect(""); + let mut raft_log = RaftLog::new(store, default_logger()); + assert_eq!(raft_log.committed, 100); + assert_eq!(raft_log.persisted, 100); + raft_log.restore(new_snapshot(200, 1)); + assert_eq!(raft_log.committed, 200); + assert_eq!(raft_log.persisted, 200); + + for i in 201..210 { + raft_log.append(&[new_entry(i, 1)]); + } + raft_log + .mut_store() + .wl() + .apply_snapshot(new_snapshot(200, 1)) + .expect(""); + raft_log.stable_snap(); + let unstable = raft_log.unstable_entries().to_vec(); + raft_log.stable_entries(); + raft_log.mut_store().wl().append(&unstable).expect(""); + raft_log.maybe_persist(209, 1); + + assert_eq!(raft_log.persisted, 209); + + raft_log.restore(new_snapshot(205, 1)); + assert_eq!(raft_log.committed, 205); + // persisted should backward to 205 + assert_eq!(raft_log.persisted, 205); + + // use smaller commit index, should panic + assert!( + panic::catch_unwind(AssertUnwindSafe(|| raft_log.restore(new_snapshot(204, 1)))) + .is_err() + ); + } } diff --git a/src/raw_node.rs b/src/raw_node.rs index 572331fd4..29a07d6bf 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -20,15 +20,15 @@ //! nodes but not the raft consensus itself. Generally, you'll interact with the //! RawNode first and use it to access the inner workings of the consensus protocol. -use std::mem; +use std::{collections::VecDeque, mem}; use protobuf::Message as PbMessage; use raft_proto::ConfChangeI; -use crate::config::Config; use crate::eraftpb::{ConfState, Entry, EntryType, HardState, Message, MessageType, Snapshot}; use crate::errors::{Error, Result}; use crate::read_only::ReadState; +use crate::{config::Config, StateRole}; use crate::{Raft, SoftState, Status, Storage}; use slog::Logger; @@ -82,9 +82,10 @@ pub fn is_empty_snap(s: &Snapshot) -> bool { /// Ready encapsulates the entries and messages that are ready to read, /// be saved to stable storage, committed or sent to other peers. -/// All fields in Ready are read-only. #[derive(Default, Debug, PartialEq)] pub struct Ready { + number: u64, + ss: Option, hs: Option, @@ -95,99 +96,92 @@ pub struct Ready { snapshot: Snapshot, - /// CommittedEntries specifies entries to be committed to a - /// store/state-machine. These have previously been committed to stable - /// store. - pub committed_entries: Option>, - - /// Messages specifies outbound messages to be sent AFTER Entries are - /// committed to stable storage. - /// If it contains a MsgSnap message, the application MUST report back to raft - /// when the snapshot has been received or has failed by calling ReportSnapshot. - pub messages: Vec, + light: LightReady, must_sync: bool, } impl Ready { - fn new( - raft: &mut Raft, - prev_ss: &SoftState, - prev_hs: &HardState, - since_idx: Option, - ) -> Ready { - let mut rd = Ready { - entries: raft.raft_log.unstable_entries().unwrap_or(&[]).to_vec(), - ..Default::default() - }; - if !raft.msgs.is_empty() { - mem::swap(&mut raft.msgs, &mut rd.messages); - } - rd.committed_entries = Some( - (match since_idx { - None => raft.raft_log.next_entries(), - Some(idx) => raft.raft_log.next_entries_since(idx), - }) - .unwrap_or_else(Vec::new), - ); - let ss = raft.soft_state(); - if &ss != prev_ss { - rd.ss = Some(ss); - } - let hs = raft.hard_state(); - if &hs != prev_hs { - if hs.vote != prev_hs.vote || hs.term != prev_hs.term || !rd.entries.is_empty() { - rd.must_sync = true; - } - rd.hs = Some(hs); - } - if raft.raft_log.unstable.snapshot.is_some() { - rd.snapshot = raft.raft_log.unstable.snapshot.clone().unwrap(); - } - if !raft.read_states.is_empty() { - rd.read_states = raft.read_states.clone(); - } - rd + /// The number of current Ready. + /// It is used for identifying the different Ready and ReadyRecord. + #[inline] + pub fn number(&self) -> u64 { + self.number } /// The current volatile state of a Node. - /// SoftState will be nil if there is no update. + /// SoftState will be None if there is no update. /// It is not required to consume or store SoftState. #[inline] pub fn ss(&self) -> Option<&SoftState> { self.ss.as_ref() } - /// The current state of a Node to be saved to stable storage BEFORE - /// Messages are sent. - /// HardState will be equal to empty state if there is no update. + /// The current state of a Node to be saved to stable storage. + /// HardState will be None state if there is no update. #[inline] pub fn hs(&self) -> Option<&HardState> { self.hs.as_ref() } - /// States can be used for node to serve linearizable read requests locally - /// when its applied index is greater than the index in ReadState. - /// Note that the read_state will be returned when raft receives MsgReadIndex. - /// The returned is only valid for the request that requested to read. + /// ReadStates specifies the state for read only query. #[inline] - pub fn read_states(&self) -> &[ReadState] { + pub fn read_states(&self) -> &Vec { &self.read_states } - /// Entries specifies entries to be saved to stable storage BEFORE - /// Messages are sent. + /// Take the ReadStates. + #[inline] + pub fn take_read_states(&mut self) -> Vec { + mem::take(&mut self.read_states) + } + + /// Entries specifies entries to be saved to stable storage. #[inline] - pub fn entries(&self) -> &[Entry] { + pub fn entries(&self) -> &Vec { &self.entries } + /// Take the Entries. + #[inline] + pub fn take_entries(&mut self) -> Vec { + mem::take(&mut self.entries) + } + /// Snapshot specifies the snapshot to be saved to stable storage. #[inline] pub fn snapshot(&self) -> &Snapshot { &self.snapshot } + /// CommittedEntries specifies entries to be committed to a + /// store/state-machine. These have previously been committed to stable + /// store. + #[inline] + pub fn committed_entries(&self) -> &Vec { + self.light.committed_entries() + } + + /// Take the CommitEntries. + #[inline] + pub fn take_committed_entries(&mut self) -> Vec { + self.light.take_committed_entries() + } + + /// Messages specifies outbound messages to be sent. + /// If it contains a MsgSnap message, the application MUST report back to raft + /// when the snapshot has been received or has failed by calling ReportSnapshot. + #[inline] + pub fn messages(&self) -> &Vec> { + self.light.messages() + } + + /// Take the Messages. + #[inline] + pub fn take_messages(&mut self) -> Vec> { + self.light.take_messages() + } + /// MustSync indicates whether the HardState and Entries must be synchronously /// written to disk or if an asynchronous write is permissible. #[inline] @@ -196,6 +190,63 @@ impl Ready { } } +/// ReadyRecord encapsulates some needed data from the corresponding Ready. +#[derive(Default, Debug, PartialEq)] +struct ReadyRecord { + number: u64, + // (index, term) of the last entry from the entries in Ready + last_entry: Option<(u64, u64)>, + has_snapshot: bool, + messages: Vec, +} + +/// LightReady encapsulates the commit index, committed entries and +/// messages that are ready to be applied or be sent to other peers. +#[derive(Default, Debug, PartialEq)] +pub struct LightReady { + commit_index: Option, + committed_entries: Vec, + messages: Vec>, +} + +impl LightReady { + /// The current commit index. + /// It will be None state if there is no update. + /// It is not required to save it to stable storage. + #[inline] + pub fn commit_index(&self) -> Option { + self.commit_index + } + + /// CommittedEntries specifies entries to be committed to a + /// store/state-machine. These have previously been committed to stable + /// store. + #[inline] + pub fn committed_entries(&self) -> &Vec { + &self.committed_entries + } + + /// Take the CommitEntries. + #[inline] + pub fn take_committed_entries(&mut self) -> Vec { + mem::take(&mut self.committed_entries) + } + + /// Messages specifies outbound messages to be sent. + /// If it contains a MsgSnap message, the application MUST report back to raft + /// when the snapshot has been received or has failed by calling ReportSnapshot. + #[inline] + pub fn messages(&self) -> &Vec> { + &self.messages + } + + /// Take the Messages. + #[inline] + pub fn take_messages(&mut self) -> Vec> { + mem::take(&mut self.messages) + } +} + /// RawNode is a thread-unsafe Node. /// The methods of this struct correspond to the methods of Node and are described /// more fully there. @@ -204,6 +255,13 @@ pub struct RawNode { pub raft: Raft, prev_ss: SoftState, prev_hs: HardState, + // Current max number of Record and ReadyRecord. + max_number: u64, + records: VecDeque, + // Index which the given committed entries should start from. + commit_since_index: u64, + // Messages that need to be sent to other peers. + messages: Vec>, } impl RawNode { @@ -216,6 +274,10 @@ impl RawNode { raft: r, prev_hs: Default::default(), prev_ss: Default::default(), + max_number: 0, + records: VecDeque::new(), + commit_since_index: config.applied, + messages: Vec::new(), }; rn.prev_hs = rn.raft.hard_state(); rn.prev_ss = rn.raft.soft_state(); @@ -242,38 +304,6 @@ impl RawNode { self.raft.set_priority(priority); } - fn commit_ready(&mut self, rd: Ready) { - if rd.ss.is_some() { - self.prev_ss = rd.ss.unwrap(); - } - if let Some(e) = rd.hs { - if e != HardState::default() { - self.prev_hs = e; - } - } - if !rd.entries.is_empty() { - let e = rd.entries.last().unwrap(); - self.raft.raft_log.stable_to(e.index, e.term); - } - if rd.snapshot != Snapshot::default() { - self.raft - .raft_log - .stable_snap_to(rd.snapshot.get_metadata().index); - } - if !rd.read_states.is_empty() { - self.raft.read_states.clear(); - } - // update raft uncommitted entries size - if rd.committed_entries.is_some() { - self.raft - .reduce_uncommitted_size(&rd.committed_entries.unwrap_or_default()) - } - } - - fn commit_apply(&mut self, applied: u64) { - self.raft.commit_apply(applied); - } - /// Tick advances the internal logical clock by a single tick. /// /// Returns true to indicate that there will probably be some readiness which @@ -339,7 +369,7 @@ impl RawNode { /// Step advances the state machine using the given message. pub fn step(&mut self, m: Message) -> Result<()> { - // ignore unexpected local messages receiving over network + // Ignore unexpected local messages receiving over network if is_local_msg(m.get_msg_type()) { return Err(Error::StepLocalMsg); } @@ -349,94 +379,276 @@ impl RawNode { Err(Error::StepPeerNotFound) } - /// Given an index, creates a new Ready value from that index. - pub fn ready_since(&mut self, applied_idx: u64) -> Ready { - Ready::new( - &mut self.raft, - &self.prev_ss, - &self.prev_hs, - Some(applied_idx), - ) + /// Generates a LightReady that has the committed entries and messages but no commit index. + fn gen_light_ready(&mut self) -> LightReady { + let mut rd = LightReady::default(); + let raft = &mut self.raft; + rd.committed_entries = raft + .raft_log + .next_entries_since(self.commit_since_index) + .unwrap_or_default(); + // Update raft uncommitted entries size + raft.reduce_uncommitted_size(&rd.committed_entries); + if let Some(e) = rd.committed_entries.last() { + assert!(self.commit_since_index < e.get_index()); + self.commit_since_index = e.get_index(); + } + + if !self.messages.is_empty() { + mem::swap(&mut rd.messages, &mut self.messages); + } + + if raft.state == StateRole::Leader && !raft.msgs.is_empty() { + // Leader can send messages immediately to make replication concurrently. + // For more details, check raft thesis 10.2.1. + rd.messages.push(mem::take(&mut raft.msgs)); + } + + rd } - /// Ready returns the current point-in-time state of this RawNode. + /// Returns the outstanding work that the application needs to handle. + /// + /// This includes appending and applying entries or a snapshot, updating the HardState, + /// and sending messages. The returned `Ready` *MUST* be handled and subsequently + /// passed back via advance() or its families. + /// + /// `has_ready` should be called first to check if it's necessary to handle the ready. pub fn ready(&mut self) -> Ready { - Ready::new(&mut self.raft, &self.prev_ss, &self.prev_hs, None) + let raft = &mut self.raft; + + self.max_number += 1; + let mut rd = Ready { + number: self.max_number, + ..Default::default() + }; + let mut rd_record = ReadyRecord { + number: self.max_number, + ..Default::default() + }; + + if self.prev_ss.raft_state != StateRole::Leader && raft.state == StateRole::Leader { + // The vote msg which makes this peer become leader has been sent after persisting. + // So the remaining records must be generated during being candidate which can not + // have last_entry and snapshot(if so, it should become follower). The only things + // left are messages and they can be sent without persisting because no key data + // changes (term, vote, entry). These messages should be added before raft.msgs to + // avoid out of order. + for record in self.records.drain(..) { + assert_eq!(record.last_entry, None); + assert!(!record.has_snapshot); + if !record.messages.is_empty() { + self.messages.push(record.messages); + } + } + } + + let ss = raft.soft_state(); + if ss != self.prev_ss { + rd.ss = Some(ss); + } + let hs = raft.hard_state(); + if hs != self.prev_hs { + if hs.vote != self.prev_hs.vote || hs.term != self.prev_hs.term { + rd.must_sync = true; + } + rd.hs = Some(hs); + } + + if !raft.read_states.is_empty() { + mem::swap(&mut rd.read_states, &mut raft.read_states); + } + + if let Some(snapshot) = &raft.raft_log.unstable_snapshot() { + rd.snapshot = snapshot.clone(); + assert!(self.commit_since_index <= rd.snapshot.get_metadata().index); + self.commit_since_index = rd.snapshot.get_metadata().index; + // If there is a snapshot, the latter entries can not be persisted + // so there is no committed entries. + assert!( + !raft + .raft_log + .has_next_entries_since(self.commit_since_index), + "has snapshot but also has committed entries since {}", + self.commit_since_index + ); + rd_record.has_snapshot = true; + rd.must_sync = true; + } + + rd.entries = raft.raft_log.unstable_entries().to_vec(); + if let Some(e) = rd.entries.last() { + // If the last entry exists, the entries must not empty, vice versa. + rd.must_sync = true; + rd_record.last_entry = Some((e.get_index(), e.get_term())); + } + + if !raft.msgs.is_empty() && raft.state != StateRole::Leader { + mem::swap(&mut rd_record.messages, &mut raft.msgs); + } + + rd.light = self.gen_light_ready(); + self.records.push_back(rd_record); + rd } - /// Given an index, can determine if there is a ready state from that time. - pub fn has_ready_since(&self, applied_idx: Option) -> bool { + /// HasReady called when RawNode user need to check if any Ready pending. + pub fn has_ready(&self) -> bool { let raft = &self.raft; - if !raft.msgs.is_empty() || raft.raft_log.unstable_entries().is_some() { + if !raft.msgs.is_empty() || !self.messages.is_empty() { return true; } - if !raft.read_states.is_empty() { + + if raft.soft_state() != self.prev_ss { return true; } - if self.snap().map_or(false, |s| !s.is_empty()) { + if raft.hard_state() != self.prev_hs { return true; } - let has_unapplied_entries = match applied_idx { - None => raft.raft_log.has_next_entries(), - Some(idx) => raft.raft_log.has_next_entries_since(idx), - }; - if has_unapplied_entries { + + if !raft.read_states.is_empty() { return true; } - if raft.soft_state() != self.prev_ss { + + if !raft.raft_log.unstable_entries().is_empty() { return true; } - let hs = raft.hard_state(); - if hs != HardState::default() && hs != self.prev_hs { + + if self.snap().map_or(false, |s| !s.is_empty()) { return true; } + + if raft + .raft_log + .has_next_entries_since(self.commit_since_index) + { + return true; + } + false } - /// HasReady called when RawNode user need to check if any Ready pending. - /// Checking logic in this method should be consistent with Ready.containsUpdates(). - #[inline] - pub fn has_ready(&self) -> bool { - self.has_ready_since(None) + fn commit_ready(&mut self, rd: Ready) { + if let Some(ss) = rd.ss { + self.prev_ss = ss; + } + if let Some(hs) = rd.hs { + self.prev_hs = hs; + } + let rd_record = self.records.back().unwrap(); + assert!(rd_record.number == rd.number); + let raft = &mut self.raft; + if rd_record.has_snapshot { + raft.raft_log.stable_snap(); + } + if rd_record.last_entry.is_some() { + raft.raft_log.stable_entries(); + } } - /// Grabs the snapshot from the raft if available. - #[inline] - pub fn snap(&self) -> Option<&Snapshot> { - self.raft.snap() + fn commit_apply(&mut self, applied: u64) { + self.raft.commit_apply(applied); + } + + /// Notifies that the ready of this number has been persisted. + /// + /// Since Ready must be persisted in order, calling this function implicitly means + /// all readies with numbers smaller than this one have been persisted. + /// + /// `has_ready` and `ready` should be called later to handle further updates that become + /// valid after ready being persisted. + pub fn on_persist_ready(&mut self, number: u64) { + let (mut index, mut term) = (0, 0); + while let Some(record) = self.records.front() { + if record.number > number { + break; + } + let mut record = self.records.pop_front().unwrap(); + + if let Some((i, t)) = record.last_entry { + index = i; + term = t; + } + + if !record.messages.is_empty() { + self.messages.push(mem::take(&mut record.messages)); + } + } + if term != 0 { + self.raft.on_persist_entries(index, term); + } } - /// Advance notifies the RawNode that the application has applied and saved progress in the - /// last Ready results. - pub fn advance(&mut self, rd: Ready) { - self.advance_append(rd); - let commit_idx = self.prev_hs.commit; - if commit_idx != 0 { - // In most cases, prevHardSt and rd.HardState will be the same - // because when there are new entries to apply we just sent a - // HardState with an updated Commit value. However, on initial - // startup the two are different because we don't send a HardState - // until something changes, but we do send any un-applied but - // committed entries (and previously-committed entries may be - // incorporated into the snapshot, even if rd.CommittedEntries is - // empty). Therefore we mark all committed entries as applied - // whether they were included in rd.HardState or not. - self.advance_apply(commit_idx); + /// Advances the ready after fully processing it. + /// + /// Fully processing a ready requires to persist snapshot, entries and hard states, apply all + /// committed entries, send all messages. + /// + /// Returns the LightReady that contains commit index, committed entries and messages. `LightReady` + /// contains updates that only valid after persisting last ready. It should also be fully processed. + /// Then `advance_apply` or `advance_apply_to` should be used later to update applying progress. + pub fn advance(&mut self, rd: Ready) -> LightReady { + let applied = self.commit_since_index; + let light_rd = self.advance_append(rd); + self.advance_apply_to(applied); + light_rd + } + + /// Advances the ready without applying committed entries. `advance_apply` or `advance_apply_to` + /// should be used later to update applying progress. + /// + /// Returns the LightReady that contains commit index, committed entries and messages. + /// + /// Since Ready must be persisted in order, calling this function implicitly means + /// all readys collected before have been persisted. + #[inline] + pub fn advance_append(&mut self, rd: Ready) -> LightReady { + self.commit_ready(rd); + self.on_persist_ready(self.max_number); + let mut light_rd = self.gen_light_ready(); + // Set commit index if it's updated + let hard_state = self.raft.hard_state(); + if hard_state.commit > self.prev_hs.commit { + light_rd.commit_index = Some(hard_state.commit); + self.prev_hs.commit = hard_state.commit; + } else { + assert!(hard_state.commit == self.prev_hs.commit); + light_rd.commit_index = None; } + assert_eq!(hard_state, self.prev_hs, "hard state != prev_hs",); + light_rd } - /// Appends and commits the ready value. + /// Same as `advance_append` except that it allows to only store the updates in cache. `on_persist_ready` + /// should be used later to update the persisting progress. + /// + /// Raft works on an assumption persisted updates should not be lost, which usually requires expensive + /// operations like `fsync`. `advance_append_async` allows you to control the rate of such operations and + /// get a reasonable batch size. However, it's still required that the updates can be read by raft from the + /// `Storage` trait before calling `advance_append_async`. #[inline] - pub fn advance_append(&mut self, rd: Ready) { + pub fn advance_append_async(&mut self, rd: Ready) { self.commit_ready(rd); } + /// Advance apply to the index of the last committed entries given before. + #[inline] + pub fn advance_apply(&mut self) { + self.commit_apply(self.commit_since_index); + } + /// Advance apply to the passed index. #[inline] - pub fn advance_apply(&mut self, applied: u64) { + pub fn advance_apply_to(&mut self, applied: u64) { self.commit_apply(applied); } + /// Grabs the snapshot from the raft if available. + #[inline] + pub fn snap(&self) -> Option<&Snapshot> { + self.raft.snap() + } + /// Status returns the current status of the given group. #[inline] pub fn status(&self) -> Status {