Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: port coreos/etcd#9985 #340

Merged
merged 3 commits into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ impl ProgressSet {
/// Adds a voter node
pub fn insert_voter(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?
return Err(Error::Exists(id, "voters"));
}
if self.learners.contains_key(&id) {
Err(Error::Exists(id, "learners"))?;
return Err(Error::Exists(id, "learners"));
}
self.voters.insert(id, pr);
Ok(())
Expand All @@ -130,10 +130,10 @@ impl ProgressSet {
/// Adds a learner to the cluster
pub fn insert_learner(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?
return Err(Error::Exists(id, "voters"));
}
if self.learners.contains_key(&id) {
Err(Error::Exists(id, "learners"))?
return Err(Error::Exists(id, "learners"));
}
self.learners.insert(id, pr);
Ok(())
Expand All @@ -150,7 +150,7 @@ impl ProgressSet {
/// Promote a learner to a peer.
pub fn promote_learner(&mut self, id: u64) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?;
return Err(Error::Exists(id, "voters"));
}
// We don't want to remove it unless it's there.
if self.learners.contains_key(&id) {
Expand Down
170 changes: 95 additions & 75 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ pub fn quorum(total: usize) -> usize {
total / 2 + 1
}

#[derive(Default)]
pub struct HandleResponseContext {
maybe_commit: bool,
send_append: bool,
loop_append: bool,
transfer_leader: bool,
old_paused: bool,
more_to_send: Vec<Message>,
}

impl<T: Storage> Raft<T> {
/// Creates a new raft for use on the node.
pub fn new(c: &Config, store: T) -> Raft<T> {
Expand Down Expand Up @@ -521,31 +531,46 @@ impl<T: Storage> Raft<T> {
}
}

/// Sends RPC, with entries to the given peer.
/// Sends an append RPC with new entries (if any) and the
/// current commit index to the given peer.
pub fn send_append(&mut self, to: u64, pr: &mut Progress) {
self.maybe_send_append(to, pr, true);
}

/// Sends an append RPC with new entries to the given peer,
/// if necessary. Returns true if a message was sent. The allow_empty
/// argument controls whether messages with no entries will be sent
/// ("empty" messages are useful to convey updated Commit indexes, but
/// are undesirable when we're sending multiple messages in a batch).
fn maybe_send_append(&mut self, to: u64, pr: &mut Progress, allow_empty: bool) -> bool {
if pr.is_paused() {
return;
return false;
}
let mut m = Message::new();
m.set_to(to);
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
return false;
}
} else {
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
if !allow_empty && ents.as_ref().ok().map_or(true, |e| e.is_empty()) {
return false;
}
let term = self.raft_log.term(pr.next_idx - 1);
match (term, ents) {
(Ok(term), Ok(ents)) => self.prepare_send_entries(&mut m, pr, term, ents),
_ => {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return false;
}
}
} else {
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap());
}
}
self.send(m);
true
}

// send_heartbeat sends an empty MsgAppend
Expand Down Expand Up @@ -1155,9 +1180,7 @@ impl<T: Storage> Raft<T> {
&mut self,
m: &Message,
prs: &mut ProgressSet,
old_paused: &mut bool,
send_append: &mut bool,
maybe_commit: &mut bool,
ctx: &mut HandleResponseContext,
) {
let pr = prs.get_mut(m.get_from()).unwrap();
pr.recent_active = true;
Expand All @@ -1181,56 +1204,61 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
*send_append = true;
ctx.send_append = true;
}
return;
}

*old_paused = pr.is_paused();
ctx.old_paused = pr.is_paused();
if !pr.maybe_update(m.get_index()) {
return;
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if pr.maybe_snapshot_abort() {
debug!(
"{} snapshot aborted, resumed sending replication messages to {} \
[{:?}]",
self.tag,
m.get_from(),
pr
);
pr.become_probe();
}
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
ctx.maybe_commit = true;
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
ctx.loop_append = true;

// Transfer leadership is in progress.
if let Some(lead_transferee) = self.lead_transferee {
if Some(m.get_from()) == self.lead_transferee {
let last_index = self.raft_log.last_index();
if m.get_from() == lead_transferee && pr.matched == last_index {
if pr.matched == last_index {
info!(
"{} sent MsgTimeoutNow to {} after received MsgAppResp",
self.tag,
m.get_from()
);
self.send_timeout_now(m.get_from());
}
}

match pr.state {
ProgressState::Probe => pr.become_replicate(),
ProgressState::Snapshot => {
if !pr.maybe_snapshot_abort() {
return;
}
debug!(
"{} snapshot aborted, resumed sending replication messages to {} \
[{:?}]",
self.tag,
m.get_from(),
pr
);
pr.become_probe();
ctx.transfer_leader = true;
}
ProgressState::Replicate => pr.ins.free_to(m.get_index()),
}
*maybe_commit = true;
}

fn handle_heartbeat_response(
&mut self,
m: &Message,
prs: &mut ProgressSet,
quorum: usize,
send_append: &mut bool,
more_to_send: &mut Option<Message>,
ctx: &mut HandleResponseContext,
) {
let pr = prs.get_mut(m.get_from()).unwrap();
pr.recent_active = true;
Expand All @@ -1242,7 +1270,7 @@ impl<T: Storage> Raft<T> {
}
// Does it request snapshot?
if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX {
*send_append = true;
ctx.send_append = true;
}

if self.read_only.option != ReadOnlyOption::Safe || m.get_context().is_empty() {
Expand All @@ -1269,7 +1297,7 @@ impl<T: Storage> Raft<T> {
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.set_index(rs.index);
to_send.set_entries(req.take_entries());
*more_to_send = Some(to_send);
ctx.more_to_send.push(to_send);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about more_to_send: HashMap<u64, Message>? So we can send only 1 read index response to one peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMap is slower than Vec, it may not be a good idea. On the other hand, the optimization assumes there is a queue in follower side, which may not be a valid case.

}
}
}
Expand All @@ -1281,9 +1309,8 @@ impl<T: Storage> Raft<T> {
}

let lead_transferee = m.get_from();
let last_lead_transferee = self.lead_transferee;
if last_lead_transferee.is_some() {
if last_lead_transferee.unwrap() == lead_transferee {
if let Some(last_lead_transferee) = self.lead_transferee {
if last_lead_transferee == lead_transferee {
info!(
"{} [term {}] transfer leadership to {} is in progress, ignores request \
to same node {}",
Expand All @@ -1294,9 +1321,7 @@ impl<T: Storage> Raft<T> {
self.abort_leader_transfer();
info!(
"{} [term {}] abort previous transferring leadership to {}",
self.tag,
self.term,
last_lead_transferee.unwrap()
self.tag, self.term, last_lead_transferee
);
}
if lead_transferee == self.id {
Expand Down Expand Up @@ -1353,14 +1378,7 @@ impl<T: Storage> Raft<T> {
}

/// Check message's progress to decide which action should be taken.
fn check_message_with_progress(
&mut self,
m: &mut Message,
send_append: &mut bool,
old_paused: &mut bool,
maybe_commit: &mut bool,
more_to_send: &mut Option<Message>,
) {
fn check_message_with_progress(&mut self, m: &mut Message, ctx: &mut HandleResponseContext) {
if self.prs().get(m.get_from()).is_none() {
debug!("{} no progress available for {}", self.tag, m.get_from());
return;
Expand All @@ -1369,11 +1387,11 @@ impl<T: Storage> Raft<T> {
let mut prs = self.take_prs();
match m.get_msg_type() {
MessageType::MsgAppendResponse => {
self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit);
self.handle_append_response(m, &mut prs, ctx);
}
MessageType::MsgHeartbeatResponse => {
let quorum = quorum(prs.voters().len());
self.handle_heartbeat_response(m, &mut prs, quorum, send_append, more_to_send);
self.handle_heartbeat_response(m, &mut prs, quorum, ctx);
}
MessageType::MsgSnapStatus => {
let pr = prs.get_mut(m.get_from()).unwrap();
Expand Down Expand Up @@ -1526,37 +1544,39 @@ impl<T: Storage> Raft<T> {
_ => {}
}

let mut send_append = false;
let mut maybe_commit = false;
let mut old_paused = false;
let mut more_to_send = None;
self.check_message_with_progress(
&mut m,
&mut send_append,
&mut old_paused,
&mut maybe_commit,
&mut more_to_send,
);
if maybe_commit {
let mut ctx = HandleResponseContext::default();
self.check_message_with_progress(&mut m, &mut ctx);
if ctx.maybe_commit {
if self.maybe_commit() {
if self.should_bcast_commit() {
self.bcast_append();
}
} else if old_paused {
} else if ctx.old_paused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
send_append = true;
ctx.send_append = true;
}
}

if send_append {
if ctx.send_append || ctx.loop_append {
let from = m.get_from();
let mut prs = self.take_prs();
self.send_append(from, prs.get_mut(from).unwrap());
let pr = prs.get_mut(from).unwrap();
if ctx.send_append {
self.send_append(from, pr);
}
if ctx.loop_append {
while self.maybe_send_append(from, pr, false) {}
}
self.set_prs(prs);
}
if let Some(to_send) = more_to_send {
self.send(to_send)
if ctx.transfer_leader {
self.send_timeout_now(m.get_from());
}
if !ctx.more_to_send.is_empty() {
for m in ctx.more_to_send.drain(..) {
self.send(m);
}
}

Ok(())
Expand Down
7 changes: 3 additions & 4 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ impl<T: Storage> RaftLog<T> {
/// returned by value. The result is truncated to the max_size in bytes.
pub fn slice(&self, low: u64, high: u64, max_size: u64) -> Result<Vec<Entry>> {
let err = self.must_check_outofbounds(low, high);
if err.is_some() {
return Err(err.unwrap());
if let Some(err) = err {
return Err(err);
}

let mut ents = vec![];
Expand All @@ -443,8 +443,7 @@ impl<T: Storage> RaftLog<T> {
let stored_entries =
self.store
.entries(low, cmp::min(high, self.unstable.offset), max_size);
if stored_entries.is_err() {
let e = stored_entries.unwrap_err();
if let Err(e) = stored_entries {
match e {
Error::Store(StorageError::Compacted) => return Err(e),
Error::Store(StorageError::Unavailable) => panic!(
Expand Down
21 changes: 11 additions & 10 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};

use eraftpb::{ConfState, Entry, HardState, Snapshot};
Expand Down Expand Up @@ -211,19 +212,19 @@ impl MemStorageCore {
};

let offset = te[0].get_index() - self.entries[0].get_index();
if self.entries.len() as u64 > offset {
let mut new_entries: Vec<Entry> = vec![];
new_entries.extend_from_slice(&self.entries[..offset as usize]);
new_entries.extend_from_slice(te);
self.entries = new_entries;
} else if self.entries.len() as u64 == offset {
self.entries.extend_from_slice(te);
} else {
panic!(
match (self.entries.len() as u64).cmp(&offset) {
Ordering::Greater => {
let mut new_entries: Vec<Entry> = vec![];
new_entries.extend_from_slice(&self.entries[..offset as usize]);
new_entries.extend_from_slice(te);
self.entries = new_entries;
}
Ordering::Equal => self.entries.extend_from_slice(te),
_ => panic!(
"missing log entry [last: {}, append at: {}]",
self.inner_last_index(),
te[0].get_index()
)
),
}

Ok(())
Expand Down
Loading