Skip to content

Commit

Permalink
*: introduce joint quorum (#382)
Browse files Browse the repository at this point in the history
This PR ports joint quorum. It rename ProgressSet to ProgressTracker
and keep both name and field layout just like etcd.

In addition, the PR fixes a performance issue discovered earlier that
campaign prints too much logs.

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay authored Jul 8, 2020
1 parent 5ca2e46 commit 91d8832
Show file tree
Hide file tree
Showing 13 changed files with 418 additions and 215 deletions.
30 changes: 15 additions & 15 deletions benches/suites/progress_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::DEFAULT_RAFT_SETS;
use criterion::{Bencher, Criterion};
use raft::{Progress, ProgressSet};
use raft::{Progress, ProgressTracker};

pub fn bench_progress_set(c: &mut Criterion) {
bench_progress_set_new(c);
Expand All @@ -17,8 +17,8 @@ pub fn bench_progress_set(c: &mut Criterion) {
bench_progress_set_learners(c);
}

fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
let mut set = ProgressSet::with_capacity(voters, learners, raft::default_logger());
fn quick_progress_set(voters: usize, learners: usize) -> ProgressTracker {
let mut set = ProgressTracker::with_capacity(voters, learners, 10, raft::default_logger());
(0..voters).for_each(|id| {
set.insert_voter(id as u64, Progress::new(0, 10)).ok();
});
Expand All @@ -31,23 +31,23 @@ fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
pub fn bench_progress_set_new(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::new(raft::default_logger()));
b.iter(|| ProgressTracker::new(256, raft::default_logger()));
};

c.bench_function("ProgressSet::new", bench);
c.bench_function("ProgressTracker::new", bench);
}

pub fn bench_progress_set_with_capacity(c: &mut Criterion) {
let bench = |voters, learners| {
move |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::with_capacity(voters, learners, raft::default_logger()));
b.iter(|| ProgressTracker::with_capacity(voters, learners, 10, raft::default_logger()));
}
};

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::with_capacity ({}, {})", voters, learners),
&format!("ProgressTracker::with_capacity ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -66,7 +66,7 @@ pub fn bench_progress_set_insert_voter(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::insert_voter ({}, {})", voters, learners),
&format!("ProgressTracker::insert_voter ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -85,7 +85,7 @@ pub fn bench_progress_set_insert_learner(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::insert_learner ({}, {})", voters, learners),
&format!("ProgressTracker::insert_learner ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -104,7 +104,7 @@ pub fn bench_progress_set_remove(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::remove ({}, {})", voters, learners),
&format!("ProgressTracker::remove ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -123,7 +123,7 @@ pub fn bench_progress_set_promote_learner(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::promote ({}, {})", voters, learners),
&format!("ProgressTracker::promote ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -143,7 +143,7 @@ pub fn bench_progress_set_iter(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::iter ({}, {})", voters, learners),
&format!("ProgressTracker::iter ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -166,7 +166,7 @@ pub fn bench_progress_set_voters(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::nodes ({}, {})", voters, learners),
&format!("ProgressTracker::nodes ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -189,7 +189,7 @@ pub fn bench_progress_set_learners(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::nodes ({}, {})", voters, learners),
&format!("ProgressTracker::nodes ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -210,7 +210,7 @@ pub fn bench_progress_set_get(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::get ({}, {})", voters, learners),
&format!("ProgressTracker::get ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand Down
2 changes: 1 addition & 1 deletion examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ fn on_ready(
ConfChangeType::RemoveNode => raft_group.raft.remove_node(node_id).unwrap(),
ConfChangeType::AddLearnerNode => raft_group.raft.add_learner(node_id).unwrap(),
}
let cs = raft_group.raft.prs().configuration().to_conf_state();
let cs = raft_group.raft.prs().conf().to_conf_state();
store.wl().set_conf_state(cs);
} else {
// For normal proposals, extract the key-value pair and then
Expand Down
26 changes: 13 additions & 13 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2929,7 +2929,7 @@ fn test_restore() {
s.get_metadata().term
);
assert_eq!(
*sm.prs().voter_ids(),
sm.prs().voter_ids().iter().collect::<HashSet<_>>(),
s.get_metadata()
.get_conf_state()
.voters
Expand Down Expand Up @@ -3143,7 +3143,7 @@ fn test_add_node() -> Result<()> {
let mut r = new_test_raft(1, vec![1], 10, 1, new_storage(), &l);
r.add_node(2)?;
assert_eq!(
*r.prs().voter_ids(),
r.prs().voter_ids().iter().collect::<HashSet<_>>(),
vec![1, 2].into_iter().collect::<HashSet<_>>()
);

Expand Down Expand Up @@ -3190,7 +3190,7 @@ fn test_remove_node() -> Result<()> {
let l = default_logger();
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l);
r.remove_node(2)?;
assert_eq!(r.prs().voter_ids().iter().next().unwrap(), &1);
assert_eq!(r.prs().voter_ids().iter().next().unwrap(), 1);
// remove all nodes from cluster
r.remove_node(1)?;
assert!(r.prs().voter_ids().is_empty());
Expand All @@ -3203,7 +3203,7 @@ fn test_remove_node_itself() -> Result<()> {
let l = default_logger().new(o!("test" => "remove_node_itself"));
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l);
n1.remove_node(1)?;
assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), &2);
assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), 2);
assert!(n1.prs().voter_ids().is_empty());
Ok(())
}
Expand Down Expand Up @@ -3235,9 +3235,9 @@ fn test_raft_nodes() {
];
for (i, (ids, wids)) in tests.drain(..).enumerate() {
let r = new_test_raft(1, ids, 10, 1, new_storage(), &l);
let voter_ids = r.prs().voter_ids();
let voter_ids: HashSet<_> = r.prs().voter_ids().iter().collect();
let wids = wids.into_iter().collect::<HashSet<_>>();
if *voter_ids != wids {
if voter_ids != wids {
panic!("#{}: nodes = {:?}, want {:?}", i, voter_ids, wids);
}
}
Expand Down Expand Up @@ -3918,12 +3918,12 @@ fn test_restore_with_learner() {
let conf_state = s.get_metadata().get_conf_state();
for &node in &conf_state.voters {
assert!(sm.prs().get(node).is_some());
assert!(!sm.prs().learner_ids().contains(&node));
assert!(!sm.prs().learner_ids().contains(node));
}

for &node in &conf_state.learners {
assert!(sm.prs().get(node).is_some());
assert!(sm.prs().learner_ids().contains(&node));
assert!(sm.prs().learner_ids().contains(node));
}

assert!(!sm.restore(s));
Expand Down Expand Up @@ -4011,8 +4011,8 @@ fn test_add_learner() -> Result<()> {
let mut n1 = new_test_raft(1, vec![1], 10, 1, new_storage(), &l);
n1.add_learner(2)?;

assert_eq!(*n1.prs().learner_ids().iter().next().unwrap(), 2);
assert!(n1.prs().learner_ids().contains(&2));
assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), 2);
assert!(n1.prs().learner_ids().contains(2));

Ok(())
}
Expand All @@ -4027,11 +4027,11 @@ fn test_add_voter_peer_promotes_self_sets_is_learner() -> Result<()> {
// Node is already voter.
n1.add_learner(1).ok();
assert!(n1.promotable());
assert!(n1.prs().voter_ids().contains(&1));
assert!(n1.prs().voter_ids().contains(1));
n1.remove_node(1)?;
n1.add_learner(1)?;
assert!(!n1.promotable());
assert!(n1.prs().learner_ids().contains(&1));
assert!(n1.prs().learner_ids().contains(1));

Ok(())
}
Expand All @@ -4043,7 +4043,7 @@ fn test_remove_learner() -> Result<()> {
let l = default_logger();
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l);
n1.remove_node(2)?;
assert_eq!(n1.prs().voter_ids().iter().next().unwrap(), &1);
assert_eq!(n1.prs().voter_ids().iter().next().unwrap(), 1);
assert!(n1.prs().learner_ids().is_empty());

n1.remove_node(1)?;
Expand Down
2 changes: 1 addition & 1 deletion harness/tests/integration_cases/test_raft_paper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ fn test_nonleader_start_election(state: StateRole, l: &Logger) {

assert_eq!(r.term, 2);
assert_eq!(r.state, StateRole::Candidate);
assert!(r.votes[&r.id]);
assert!(r.prs().votes()[&r.id]);
let mut msgs = r.read_messages();
msgs.sort_by_key(|m| format!("{:?}", m));
let new_message_ext = |f, to| {
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,11 @@ pub mod util;
pub use self::config::Config;
pub use self::errors::{Error, Result, StorageError};
pub use self::log_unstable::Unstable;
pub use self::quorum::joint::Configuration as JointConfig;
pub use self::quorum::majority::Configuration as MajorityConfig;
pub use self::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, INVALID_INDEX};
pub use self::raft_log::{RaftLog, NO_LIMIT};
pub use self::tracker::{Inflights, Progress, ProgressSet, ProgressState};
pub use self::tracker::{Inflights, Progress, ProgressState, ProgressTracker};

#[allow(deprecated)]
pub use self::raw_node::is_empty_snap;
Expand Down
1 change: 1 addition & 0 deletions src/quorum.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

pub mod joint;
pub mod majority;

use std::collections::HashMap;
Expand Down
83 changes: 83 additions & 0 deletions src/quorum/joint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use super::{AckedIndexer, VoteResult};
use crate::util::Union;
use crate::HashSet;
use crate::MajorityConfig;
use std::cmp;

/// A configuration of two groups of (possibly overlapping) majority configurations.
/// Decisions require the support of both majorities.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Configuration {
pub(crate) incoming: MajorityConfig,
pub(crate) outgoing: MajorityConfig,
}

impl Configuration {
/// Creates a new configuration using the given IDs.
pub fn new(voters: HashSet<u64>) -> Configuration {
Configuration {
incoming: MajorityConfig::new(voters),
outgoing: MajorityConfig::default(),
}
}

/// Creates an empty configuration with given capacity.
pub fn with_capacity(cap: usize) -> Configuration {
Configuration {
incoming: MajorityConfig::with_capacity(cap),
outgoing: MajorityConfig::default(),
}
}

/// Returns the largest committed index for the given joint quorum. An index is
/// jointly committed if it is committed in both constituent majorities.
///
/// The bool flag indicates whether the index is computed by group commit algorithm
/// successfully. It's true only when both majorities use group commit.
pub fn committed_index(&self, use_group_commit: bool, l: &impl AckedIndexer) -> (u64, bool) {
let (i_idx, i_use_gc) = self.incoming.committed_index(use_group_commit, l);
let (o_idx, o_use_gc) = self.outgoing.committed_index(use_group_commit, l);
(cmp::min(i_idx, o_idx), i_use_gc && o_use_gc)
}

/// Takes a mapping of voters to yes/no (true/false) votes and returns a result
/// indicating whether the vote is pending, lost, or won. A joint quorum requires
/// both majority quorums to vote in favor.
pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
let i = self.incoming.vote_result(&check);
let o = self.outgoing.vote_result(check);
match (i, o) {
// It won if won in both.
(VoteResult::Won, VoteResult::Won) => VoteResult::Won,
// It lost if lost in either.
(VoteResult::Lost, _) | (_, VoteResult::Lost) => VoteResult::Lost,
// It remains pending if pending in both or just won in one side.
_ => VoteResult::Pending,
}
}

/// Clears all IDs.
pub fn clear(&mut self) {
self.incoming.clear();
self.outgoing.clear();
}

/// Returns true if (and only if) there is only one voting member
/// (i.e. the leader) in the current configuration.
pub fn is_singleton(&self) -> bool {
self.outgoing.voters.is_empty() && self.incoming.voters.len() == 1
}

/// Returns an iterator over two hash set without cloning.
pub fn ids(&self) -> Union<'_> {
Union::new(&self.incoming.voters, &self.outgoing.voters)
}

/// Check if an id is a voter.
#[inline]
pub fn contains(&self, id: u64) -> bool {
self.incoming.voters.contains(&id) || self.outgoing.voters.contains(&id)
}
}
9 changes: 7 additions & 2 deletions src/quorum/majority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ impl Configuration {

/// Returns the MajorityConfig as a sorted slice.
pub fn slice(&self) -> Vec<u64> {
let mut voters: Vec<_> = self.voters.iter().cloned().collect();
let mut voters = self.raw_slice();
voters.sort();
voters
}

/// Returns the MajorityConfig as a slice.
pub fn raw_slice(&self) -> Vec<u64> {
self.voters.iter().cloned().collect()
}

/// Computes the committed index from those supplied via the
/// provided AckedIndexer (for the active config).
///
Expand All @@ -43,7 +48,7 @@ impl Configuration {
if self.voters.is_empty() {
// This plays well with joint quorums which, when one half is the zero
// MajorityConfig, should behave like the other half.
return (u64::MAX, false);
return (u64::MAX, true);
}

let mut stack_arr: [MaybeUninit<Index>; 7] = unsafe { MaybeUninit::uninit().assume_init() };
Expand Down
Loading

0 comments on commit 91d8832

Please sign in to comment.