Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support asynchronous ready #403

Merged
merged 46 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b196b8a
change ready to async
gengliqi Sep 22, 2020
a41d502
fix
gengliqi Sep 22, 2020
bc83652
add commit_since_index
gengliqi Oct 14, 2020
7b0825f
refine ready interface
gengliqi Oct 14, 2020
683c593
update doc
gengliqi Oct 15, 2020
8b87abc
Merge branch 'master' into async-ready-v2
gengliqi Oct 15, 2020
fbdd8ba
address comment
gengliqi Oct 16, 2020
d58b237
fix bench
gengliqi Oct 20, 2020
8be9038
change mem cmp to take
gengliqi Oct 21, 2020
6930aae
Merge branch 'master' into async-ready-v2
hicqu Oct 21, 2020
9da4829
remove unstable entries copy in ready
gengliqi Oct 22, 2020
d72d121
add commit index to PersistLastReadyResult
gengliqi Oct 26, 2020
6c3b23e
update example
gengliqi Nov 3, 2020
f864bc2
remove forward commit index to applied index
gengliqi Nov 4, 2020
681df23
merge master
gengliqi Nov 4, 2020
cd17ad1
add some tests
gengliqi Nov 5, 2020
347c39a
fix persist_idx bug and move it into raft_log
gengliqi Nov 5, 2020
c2c7183
update annotations
gengliqi Nov 5, 2020
45ccd2b
add async ready follower test
gengliqi Nov 5, 2020
22ab781
address comments
gengliqi Nov 9, 2020
19113f8
address comments
gengliqi Nov 9, 2020
43d80a0
add more unit tests
gengliqi Nov 11, 2020
4b9a764
fix the order of handling ready
gengliqi Nov 11, 2020
b223a2d
make maybe_persist more robust
gengliqi Nov 12, 2020
b130b96
update annotations
gengliqi Nov 13, 2020
df1b8e2
step back to copy unstable entries
gengliqi Nov 16, 2020
e0cf385
Merge branch 'master' into async-ready-v2
gengliqi Nov 16, 2020
ef0ddea
update the persisted
gengliqi Nov 16, 2020
e2a8b44
add restore test
gengliqi Nov 17, 2020
a4362e4
add tests and more annotation
gengliqi Nov 19, 2020
d93ea8b
remove on_persist_snap
gengliqi Nov 19, 2020
4845a8d
pending snapshot can get ready
gengliqi Nov 19, 2020
88ed0c0
raw_node: cleanup API (#1)
BusyJay Nov 20, 2020
2afc452
update
gengliqi Nov 20, 2020
a4ab152
add test_async_ready_become_leader test
gengliqi Nov 20, 2020
efd603b
add annotations
gengliqi Nov 23, 2020
f72acb8
update invariant
gengliqi Nov 23, 2020
6d39386
remove on_persist_last_ready
gengliqi Nov 23, 2020
fbcf0f3
add annotations
gengliqi Nov 23, 2020
c17288d
remove the empty test
gengliqi Nov 23, 2020
fee8bb6
update test_has_next_ents_and_next_ents
gengliqi Nov 23, 2020
3617233
update lib.rs
gengliqi Nov 24, 2020
c37cb15
update doc
gengliqi Nov 24, 2020
f958c38
address comments
gengliqi Nov 24, 2020
c93e1e1
format
gengliqi Nov 24, 2020
de12bc2
update annotations
gengliqi Nov 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions benches/suites/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,14 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
let mut node = quick_raw_node(logger);
node.raft.become_candidate();
node.raft.become_leader();
node.raft
.raft_log
.store
.wl()
.append(&node.raft.raft_log.unstable.entries)
.expect("");
node.raft.raft_log.stable_to(1, 1);
node.raft.on_persist_entries(1, 1);
node.raft.commit_apply(1);
let mut entries = vec![];
for i in 1..101 {
Expand All @@ -114,11 +121,11 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
e.term = 1;
entries.push(e);
}
let mut unstable_entries = entries.clone();
node.raft.append_entry(&mut entries);
node.raft.raft_log.store.wl().append(&entries).expect("");
node.raft.raft_log.unstable.offset = 102;
// This increases 'committed_index' to `last_index` because there is only one node in quorum.
node.raft.append_entry(&mut unstable_entries);
node.raft
.on_persist_entries(node.raft.raft_log.last_index(), 1);

let mut snap = Snapshot::default();
snap.set_data(vec![0; 8 * 1024 * 1024]);
Expand All @@ -128,6 +135,6 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
node.raft.msgs.push(Message::default());
}
// Force reverting committed index to provide us some entries to be stored from next `Ready`
node.raft.raft_log.committed = 101;
node.raft.raft_log.committed = 1;
node
}
103 changes: 58 additions & 45 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,28 @@ fn on_ready(
// Get the `Ready` with `RawNode::ready` interface.
let mut ready = raft_group.ready();

let handle_messages = |msgs: Vec<Vec<Message>>| {
for vec_msg in msgs {
for msg in vec_msg {
let to = msg.to;
if mailboxes[&to].send(msg).is_err() {
error!(
logger,
"send raft message to {} fail, let Raft retry it", to
);
}
}
}
};

// Send out the messages come from the node.
handle_messages(ready.take_messages());

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
store.wl().set_hardstate(hs.clone());
}

// Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize
// raft logs to the latest position.
if let Err(e) = store.wl().append(ready.entries()) {
Expand All @@ -268,54 +290,45 @@ fn on_ready(
}
}

// Send out the messages come from the node.
for msg in ready.messages.drain(..) {
let to = msg.to;
if mailboxes[&to].send(msg).is_err() {
error!(
logger,
"send raft message to {} fail, let Raft retry it", to
);
}
}

// Apply all committed proposals.
if let Some(committed_entries) = ready.committed_entries.take() {
for entry in &committed_entries {
if entry.data.is_empty() {
// From new elected leaders.
continue;
}
if let EntryType::EntryConfChange = entry.get_entry_type() {
// For conf change messages, make them effective.
let mut cc = ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
let cs = raft_group.apply_conf_change(&cc).unwrap();
store.wl().set_conf_state(cs);
} else {
// For normal proposals, extract the key-value pair and then
// insert them into the kv engine.
let data = str::from_utf8(&entry.data).unwrap();
let reg = Regex::new("put ([0-9]+) (.+)").unwrap();
if let Some(caps) = reg.captures(&data) {
kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string());
let mut handle_committed_entries =
|rn: &mut RawNode<MemStorage>, committed_entries: Vec<Entry>| {
for entry in committed_entries {
if entry.data.is_empty() {
// From new elected leaders.
continue;
}
if let EntryType::EntryConfChange = entry.get_entry_type() {
// For conf change messages, make them effective.
let mut cc = ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
let cs = rn.apply_conf_change(&cc).unwrap();
store.wl().set_conf_state(cs);
} else {
// For normal proposals, extract the key-value pair and then
// insert them into the kv engine.
let data = str::from_utf8(&entry.data).unwrap();
let reg = Regex::new("put ([0-9]+) (.+)").unwrap();
if let Some(caps) = reg.captures(&data) {
kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string());
}
}
if rn.raft.state == StateRole::Leader {
// The leader should response to the clients, tell them if their proposals
// succeeded or not.
let proposal = proposals.lock().unwrap().pop_front().unwrap();
proposal.propose_success.send(true).unwrap();
}
}
if raft_group.raft.state == StateRole::Leader {
// The leader should response to the clients, tell them if their proposals
// succeeded or not.
let proposal = proposals.lock().unwrap().pop_front().unwrap();
proposal.propose_success.send(true).unwrap();
}
}
if let Some(last_committed) = committed_entries.last() {
let mut s = store.wl();
s.mut_hard_state().commit = last_committed.index;
s.mut_hard_state().term = last_committed.term;
}
}
};
// Apply all committed entries.
handle_committed_entries(raft_group, ready.take_committed_entries());

// Call `RawNode::advance` interface to update position flags in the raft.
raft_group.advance(ready);
let mut res = raft_group.advance(ready);
// Send out the messages
handle_messages(res.take_messages());
// Apply all committed entries.
handle_committed_entries(raft_group, res.take_committed_entries());
}

fn example_config() -> Config {
Expand Down
70 changes: 33 additions & 37 deletions examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,52 +102,43 @@ fn main() {
}
}

fn on_ready(r: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>) {
if !r.has_ready() {
fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>) {
if !raft_group.has_ready() {
return;
}
let store = raft_group.raft.raft_log.store.clone();

// The Raft is ready, we can do something now.
let mut ready = r.ready();
// Get the `Ready` with `RawNode::ready` interface.
let mut ready = raft_group.ready();

let is_leader = r.raft.leader_id == r.raft.id;
if is_leader {
// If the peer is leader, the leader can send messages to other followers ASAP.
let msgs = ready.messages.drain(..);
for _msg in msgs {
// Here we only have one peer, so can ignore this.
let handle_messages = |msgs: Vec<Vec<Message>>| {
for vec_msg in msgs {
for _msg in vec_msg {
// Send messages to other peers.
}
}
}

if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
r.mut_store()
.wl()
.apply_snapshot(ready.snapshot().clone())
.unwrap();
}
};

if !ready.entries().is_empty() {
// Append entries to the Raft log
r.mut_store().wl().append(ready.entries()).unwrap();
}
// Send out the messages come from the node.
handle_messages(ready.take_messages());

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
r.mut_store().wl().set_hardstate(hs.clone());
store.wl().set_hardstate(hs.clone());
}

if !is_leader {
// If not leader, the follower needs to reply the messages to
// the leader after appending Raft entries.
let msgs = ready.messages.drain(..);
for _msg in msgs {
// Send messages to other peers.
}
if !ready.entries().is_empty() {
// Append entries to the Raft log.
store.wl().append(&ready.entries()).unwrap();
}

if let Some(committed_entries) = ready.committed_entries.take() {
let mut _last_apply_index = 0;
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
store.wl().apply_snapshot(ready.snapshot().clone()).unwrap();
}

let mut _last_apply_index = 0;
let mut handle_committed_entries = |committed_entries: Vec<Entry>| {
for entry in committed_entries {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
Expand All @@ -166,10 +157,15 @@ fn on_ready(r: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>)

// TODO: handle EntryConfChange
}
}

// Advance the Raft
r.advance(ready);
};
handle_committed_entries(ready.take_committed_entries());

// Advance the Raft.
let mut res = raft_group.advance(ready);
// Send out the messages.
handle_messages(res.take_messages());
// Apply all committed entries.
handle_committed_entries(res.take_committed_entries());
}

fn send_propose(logger: Logger, sender: mpsc::Sender<Msg>) {
Expand Down
15 changes: 14 additions & 1 deletion harness/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ impl Interface {
/// Step the raft, if it exists.
pub fn step(&mut self, m: Message) -> Result<()> {
match self.raft {
Some(_) => Raft::step(self, m),
Some(_) => {
let res = Raft::step(self, m);
self.persist_entries();
res
}
None => Ok(()),
}
}
Expand All @@ -52,6 +56,15 @@ impl Interface {
None => vec![],
}
}

/// Persist the raft entries
pub fn persist_entries(&mut self) {
if self.raft.is_some() {
let last_index = self.raft_log.last_index();
let last_term = self.raft_log.last_term();
self.on_persist_entries(last_index, last_term);
}
}
}

impl From<Option<Raft<MemStorage>>> for Interface {
Expand Down
12 changes: 8 additions & 4 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ fn next_ents(r: &mut Raft<MemStorage>, s: &MemStorage) -> Vec<Entry> {
}
let (last_idx, last_term) = (r.raft_log.last_index(), r.raft_log.last_term());
r.raft_log.stable_to(last_idx, last_term);
r.on_persist_entries(last_idx, last_term);
let ents = r.raft_log.next_entries();
r.commit_apply(r.raft_log.committed);
ents.unwrap_or_else(Vec::new)
Expand Down Expand Up @@ -301,6 +302,8 @@ fn test_progress_leader() {
let mut raft = new_test_raft(1, vec![1, 2], 5, 1, new_storage(), &l);
raft.become_candidate();
raft.become_leader();
// For no-op entry
raft.persist_entries();
raft.mut_prs().get_mut(2).unwrap().become_replicate();

let prop_msg = new_message(1, 1, MessageType::MsgPropose, 1);
Expand Down Expand Up @@ -2682,22 +2685,23 @@ fn test_bcast_beat() {
sm.become_candidate();
sm.become_leader();
for i in 0..10 {
sm.append_entry(&mut [empty_entry(0, i as u64 + 1)]);
sm.append_entry(&mut [empty_entry(0, offset + i + 1)]);
}
sm.persist_entries();
// slow follower
let mut_pr = |sm: &mut Interface, n, matched, next_idx| {
let m = sm.mut_prs().get_mut(n).unwrap();
m.matched = matched;
m.next_idx = next_idx;
};
// slow follower
mut_pr(&mut sm, 2, 5, 6);
mut_pr(&mut sm, 2, offset + 5, offset + 6);
// normal follower
let last_index = sm.raft_log.last_index();
mut_pr(&mut sm, 3, last_index, last_index + 1);

sm.step(new_message(0, 0, MessageType::MsgBeat, 0))
.expect("");
// Use Raft::step explicitly to bypass persist_entries inside which forwards commit index and sends append msg
Raft::step(&mut sm, new_message(0, 0, MessageType::MsgBeat, 0)).expect("");
let mut msgs = sm.read_messages();
assert_eq!(msgs.len(), 2);

Expand Down
Loading