diff --git a/src/raft.rs b/src/raft.rs index 87a8ef7d3..ad211436f 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -220,6 +220,17 @@ pub fn quorum(total: usize) -> usize { total / 2 + 1 } +#[derive(Default)] +pub struct HandleResponseContext { + maybe_commit: bool, + has_reply: bool, + send_append: bool, + loop_append: bool, + transfer_leader: bool, + old_paused: bool, + more_to_send: Vec, +} + impl Raft { /// Creates a new raft for use on the node. pub fn new(c: &Config, store: T) -> Raft { @@ -521,31 +532,45 @@ impl Raft { } } - /// Sends RPC, with entries to the given peer. + /// Sends an append RPC with new entries (if any) and the + /// current commit index to the given peer. pub fn send_append(&mut self, to: u64, pr: &mut Progress) { + self.maybe_send_append(to, pr, true); + } + + /// Sends an append RPC with new entries to the given peer, + /// if necessary. Returns true if a message was sent. The allow_empty + /// argument controls whether messages with no entries will be sent + /// ("empty" messages are useful to convey updated Commit indexes, but + /// are undesirable when we're sending multiple messages in a batch). + fn maybe_send_append(&mut self, to: u64, pr: &mut Progress, allow_empty: bool) -> bool { if pr.is_paused() { - return; + return false; } let mut m = Message::new(); m.set_to(to); if pr.pending_request_snapshot != INVALID_INDEX { // Check pending request snapshot first to avoid unnecessary loading entries. if !self.prepare_send_snapshot(&mut m, pr, to) { - return; + return false; } } else { - let term = self.raft_log.term(pr.next_idx - 1); let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size); + if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) { + return false; + } + let term = self.raft_log.term(pr.next_idx - 1); if term.is_err() || ents.is_err() { // send snapshot if we failed to get term or entries. if !self.prepare_send_snapshot(&mut m, pr, to) { - return; + return false; } } else { self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap()); } } self.send(m); + true } // send_heartbeat sends an empty MsgAppend @@ -1155,9 +1180,7 @@ impl Raft { &mut self, m: &Message, prs: &mut ProgressSet, - old_paused: &mut bool, - send_append: &mut bool, - maybe_commit: &mut bool, + ctx: &mut HandleResponseContext, ) { let pr = prs.get_mut(m.get_from()).unwrap(); pr.recent_active = true; @@ -1181,47 +1204,55 @@ impl Raft { if pr.state == ProgressState::Replicate { pr.become_probe(); } - *send_append = true; + ctx.send_append = true; + ctx.has_reply = true; } return; } - *old_paused = pr.is_paused(); + ctx.old_paused = pr.is_paused(); if !pr.maybe_update(m.get_index()) { return; } + match pr.state { + ProgressState::Probe => pr.become_replicate(), + ProgressState::Snapshot => { + if pr.maybe_snapshot_abort() { + debug!( + "{} snapshot aborted, resumed sending replication messages to {} \ + [{:?}]", + self.tag, + m.get_from(), + pr + ); + pr.become_probe(); + } + } + ProgressState::Replicate => pr.ins.free_to(m.get_index()), + } + ctx.maybe_commit = true; + // We've updated flow control information above, which may + // allow us to send multiple (size-limited) in-flight messages + // at once (such as when transitioning from probe to + // replicate, or when freeTo() covers multiple messages). If + // we have more entries to send, send as many messages as we + // can (without sending empty messages for the commit index) + ctx.loop_append = true; + ctx.has_reply = true; + // Transfer leadership is in progress. - if let Some(lead_transferee) = self.lead_transferee { + if Some(m.get_from()) == self.lead_transferee { let last_index = self.raft_log.last_index(); - if m.get_from() == lead_transferee && pr.matched == last_index { + if pr.matched == last_index { info!( "{} sent MsgTimeoutNow to {} after received MsgAppResp", self.tag, m.get_from() ); - self.send_timeout_now(m.get_from()); - } - } - - match pr.state { - ProgressState::Probe => pr.become_replicate(), - ProgressState::Snapshot => { - if !pr.maybe_snapshot_abort() { - return; - } - debug!( - "{} snapshot aborted, resumed sending replication messages to {} \ - [{:?}]", - self.tag, - m.get_from(), - pr - ); - pr.become_probe(); + ctx.transfer_leader = true; } - ProgressState::Replicate => pr.ins.free_to(m.get_index()), } - *maybe_commit = true; } fn handle_heartbeat_response( @@ -1229,8 +1260,7 @@ impl Raft { m: &Message, prs: &mut ProgressSet, quorum: usize, - send_append: &mut bool, - more_to_send: &mut Option, + ctx: &mut HandleResponseContext, ) { let pr = prs.get_mut(m.get_from()).unwrap(); pr.recent_active = true; @@ -1242,7 +1272,8 @@ impl Raft { } // Does it request snapshot? if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX { - *send_append = true; + ctx.send_append = true; + ctx.has_reply = true; } if self.read_only.option != ReadOnlyOption::Safe || m.get_context().is_empty() { @@ -1269,7 +1300,7 @@ impl Raft { to_send.set_msg_type(MessageType::MsgReadIndexResp); to_send.set_index(rs.index); to_send.set_entries(req.take_entries()); - *more_to_send = Some(to_send); + ctx.more_to_send.push(to_send); } } } @@ -1353,14 +1384,7 @@ impl Raft { } /// Check message's progress to decide which action should be taken. - fn check_message_with_progress( - &mut self, - m: &mut Message, - send_append: &mut bool, - old_paused: &mut bool, - maybe_commit: &mut bool, - more_to_send: &mut Option, - ) { + fn check_message_with_progress(&mut self, m: &mut Message, ctx: &mut HandleResponseContext) { if self.prs().get(m.get_from()).is_none() { debug!("{} no progress available for {}", self.tag, m.get_from()); return; @@ -1369,11 +1393,11 @@ impl Raft { let mut prs = self.take_prs(); match m.get_msg_type() { MessageType::MsgAppendResponse => { - self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit); + self.handle_append_response(m, &mut prs, ctx); } MessageType::MsgHeartbeatResponse => { let quorum = quorum(prs.voters().len()); - self.handle_heartbeat_response(m, &mut prs, quorum, send_append, more_to_send); + self.handle_heartbeat_response(m, &mut prs, quorum, ctx); } MessageType::MsgSnapStatus => { let pr = prs.get_mut(m.get_from()).unwrap(); @@ -1526,37 +1550,40 @@ impl Raft { _ => {} } - let mut send_append = false; - let mut maybe_commit = false; - let mut old_paused = false; - let mut more_to_send = None; - self.check_message_with_progress( - &mut m, - &mut send_append, - &mut old_paused, - &mut maybe_commit, - &mut more_to_send, - ); - if maybe_commit { + let mut ctx = HandleResponseContext::default(); + self.check_message_with_progress(&mut m, &mut ctx); + if ctx.maybe_commit { if self.maybe_commit() { if self.should_bcast_commit() { self.bcast_append(); } - } else if old_paused { + } else if ctx.old_paused { // update() reset the wait state on this node. If we had delayed sending // an update before, send it now. - send_append = true; + ctx.send_append = true; + ctx.has_reply = true; } } - if send_append { + if ctx.has_reply { let from = m.get_from(); let mut prs = self.take_prs(); - self.send_append(from, prs.get_mut(from).unwrap()); + let pr = prs.get_mut(from).unwrap(); + if ctx.send_append { + self.send_append(from, pr); + } + if ctx.loop_append { + while self.maybe_send_append(from, pr, false) {} + } self.set_prs(prs); } - if let Some(to_send) = more_to_send { - self.send(to_send) + if ctx.transfer_leader { + self.send_timeout_now(m.get_from()); + } + if !ctx.more_to_send.is_empty() { + for m in ctx.more_to_send.drain(..) { + self.send(m); + } } Ok(()) diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index 71ab42d24..10fbac59a 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -332,6 +332,74 @@ fn test_progress_paused() { assert_eq!(ms.len(), 1); } +#[test] +fn test_progress_flow_control() { + setup_for_test(); + let mut cfg = new_test_config(1, vec![1, 2], 5, 1); + cfg.max_inflight_msgs = 3; + cfg.max_size_per_msg = 2048; + let mut r = Interface::new(Raft::new(&cfg, new_storage())); + r.become_candidate(); + r.become_leader(); + + // Throw away all the messages relating to the initial election. + r.read_messages(); + + // While node 2 is in probe state, propose a bunch of entries. + r.mut_prs().get_mut(2).unwrap().become_probe(); + let data: String = std::iter::repeat('a').take(1000).collect(); + for _ in 0..10 { + let msg = new_message_with_entries( + 1, + 1, + MessageType::MsgPropose, + vec![new_entry(0, 0, Some(&data))], + ); + r.step(msg).unwrap(); + } + + let mut ms = r.read_messages(); + // First append has two entries: the empty entry to confirm the + // election, and the first proposal (only one proposal gets sent + // because we're in probe state). + assert_eq!(ms.len(), 1); + assert_eq!(ms[0].msg_type, MessageType::MsgAppend); + assert_eq!(ms[0].entries.len(), 2); + assert_eq!(ms[0].entries[0].data.len(), 0); + assert_eq!(ms[0].entries[1].data.len(), 1000); + + // When this append is acked, we change to replicate state and can + // send multiple messages at once. + let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0); + msg.index = ms[0].entries[1].index; + r.step(msg).unwrap(); + ms = r.read_messages(); + assert_eq!(ms.len(), 3); + for (i, m) in ms.iter().enumerate() { + if m.msg_type != MessageType::MsgAppend { + panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type); + } + if m.entries.len() != 2 { + panic!("{}: expected 2 entries, got {}", i, m.entries.len()); + } + } + + // Ack all three of those messages together and get the last two + // messages (containing three entries). + let mut msg = new_message(2, 1, MessageType::MsgAppendResponse, 0); + msg.index = ms[2].entries[1].index; + r.step(msg).unwrap(); + ms = r.read_messages(); + assert_eq!(ms.len(), 2); + for (i, m) in ms.iter().enumerate() { + if m.msg_type != MessageType::MsgAppend { + panic!("{}: expected MsgAppend, got {:?}", i, m.msg_type); + } + } + assert_eq!(ms[0].entries.len(), 2); + assert_eq!(ms[1].entries.len(), 1); +} + #[test] fn test_leader_election() { setup_for_test(); @@ -2161,26 +2229,42 @@ fn test_read_only_option_safe() { assert_eq!(nt.peers[&1].state, StateRole::Leader); let mut tests = vec![ - (1, 10, 11, "ctx1"), - (2, 10, 21, "ctx2"), - (3, 10, 31, "ctx3"), - (1, 10, 41, "ctx4"), - (2, 10, 51, "ctx5"), - (3, 10, 61, "ctx6"), + (1, 10, 11, vec!["ctx1", "ctx11"], false), + (2, 10, 21, vec!["ctx2", "ctx22"], false), + (3, 10, 31, vec!["ctx3", "ctx33"], false), + (1, 10, 41, vec!["ctx4", "ctx44"], true), + (2, 10, 51, vec!["ctx5", "ctx55"], true), + (3, 10, 61, vec!["ctx6", "ctx66"], true), ]; - for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { + for (i, (id, proposals, wri, wctx, pending)) in tests.drain(..).enumerate() { for _ in 0..proposals { nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); } - let e = new_entry(0, 0, Some(wctx)); - nt.send(vec![new_message_with_entries( + let msg1 = new_message_with_entries( id, id, MessageType::MsgReadIndex, - vec![e], - )]); + vec![new_entry(0, 0, Some(wctx[0]))], + ); + let msg2 = new_message_with_entries( + id, + id, + MessageType::MsgReadIndex, + vec![new_entry(0, 0, Some(wctx[1]))], + ); + + if pending { + // drop MsgHeartbeatResponse here to prevent leader handling pending ReadIndex request per round + nt.ignore(MessageType::MsgHeartbeatResponse); + nt.send(vec![msg1.clone(), msg1.clone(), msg2.clone()]); + nt.recover(); + // send a ReadIndex request with the last ctx to notify leader to handle pending read requests + nt.send(vec![msg2.clone()]); + } else { + nt.send(vec![msg1.clone(), msg1.clone(), msg2.clone()]); + } let read_states: Vec = nt .peers @@ -2192,16 +2276,18 @@ fn test_read_only_option_safe() { if read_states.is_empty() { panic!("#{}: read_states is empty, want non-empty", i); } - let rs = &read_states[0]; - if rs.index != wri { - panic!("#{}: read_index = {}, want {}", i, rs.index, wri) - } - let vec_wctx = wctx.as_bytes().to_vec(); - if rs.request_ctx != vec_wctx { - panic!( - "#{}: request_ctx = {:?}, want {:?}", - i, rs.request_ctx, vec_wctx - ) + assert_eq!(read_states.len(), wctx.len()); + for (rs, wctx) in read_states.iter().zip(wctx) { + if rs.index != wri { + panic!("#{}: read_index = {}, want {}", i, rs.index, wri) + } + let ctx_bytes = wctx.as_bytes().to_vec(); + if rs.request_ctx != ctx_bytes { + panic!( + "#{}: request_ctx = {:?}, want {:?}", + i, rs.request_ctx, ctx_bytes + ) + } } } } @@ -3502,12 +3588,12 @@ fn test_transfer_non_member() { setup_for_test(); let mut raft = new_test_raft(1, vec![2, 3, 4], 5, 1, new_storage()); raft.step(new_message(2, 1, MessageType::MsgTimeoutNow, 0)) - .expect("");; + .expect(""); raft.step(new_message(2, 1, MessageType::MsgRequestVoteResponse, 0)) - .expect("");; + .expect(""); raft.step(new_message(3, 1, MessageType::MsgRequestVoteResponse, 0)) - .expect("");; + .expect(""); assert_eq!(raft.state, StateRole::Follower); }