Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: introduce joint quorum #382

Merged
merged 7 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions benches/suites/progress_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::DEFAULT_RAFT_SETS;
use criterion::{Bencher, Criterion};
use raft::{Progress, ProgressSet};
use raft::{Progress, ProgressTracker};

pub fn bench_progress_set(c: &mut Criterion) {
bench_progress_set_new(c);
Expand All @@ -17,8 +17,8 @@ pub fn bench_progress_set(c: &mut Criterion) {
bench_progress_set_learners(c);
}

fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
let mut set = ProgressSet::with_capacity(voters, learners, raft::default_logger());
fn quick_progress_set(voters: usize, learners: usize) -> ProgressTracker {
let mut set = ProgressTracker::with_capacity(voters, learners, 10, raft::default_logger());
(0..voters).for_each(|id| {
set.insert_voter(id as u64, Progress::new(0, 10)).ok();
});
Expand All @@ -31,23 +31,23 @@ fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
pub fn bench_progress_set_new(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::new(raft::default_logger()));
b.iter(|| ProgressTracker::new(256, raft::default_logger()));
};

c.bench_function("ProgressSet::new", bench);
c.bench_function("ProgressTracker::new", bench);
}

pub fn bench_progress_set_with_capacity(c: &mut Criterion) {
let bench = |voters, learners| {
move |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::with_capacity(voters, learners, raft::default_logger()));
b.iter(|| ProgressTracker::with_capacity(voters, learners, 10, raft::default_logger()));
}
};

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::with_capacity ({}, {})", voters, learners),
&format!("ProgressTracker::with_capacity ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -66,7 +66,7 @@ pub fn bench_progress_set_insert_voter(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::insert_voter ({}, {})", voters, learners),
&format!("ProgressTracker::insert_voter ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -85,7 +85,7 @@ pub fn bench_progress_set_insert_learner(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::insert_learner ({}, {})", voters, learners),
&format!("ProgressTracker::insert_learner ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -104,7 +104,7 @@ pub fn bench_progress_set_remove(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::remove ({}, {})", voters, learners),
&format!("ProgressTracker::remove ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -123,7 +123,7 @@ pub fn bench_progress_set_promote_learner(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::promote ({}, {})", voters, learners),
&format!("ProgressTracker::promote ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -143,7 +143,7 @@ pub fn bench_progress_set_iter(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::iter ({}, {})", voters, learners),
&format!("ProgressTracker::iter ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -166,7 +166,7 @@ pub fn bench_progress_set_voters(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::nodes ({}, {})", voters, learners),
&format!("ProgressTracker::nodes ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -189,7 +189,7 @@ pub fn bench_progress_set_learners(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::nodes ({}, {})", voters, learners),
&format!("ProgressTracker::nodes ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand All @@ -210,7 +210,7 @@ pub fn bench_progress_set_get(c: &mut Criterion) {

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::get ({}, {})", voters, learners),
&format!("ProgressTracker::get ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand Down
2 changes: 1 addition & 1 deletion examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ fn on_ready(
ConfChangeType::RemoveNode => raft_group.raft.remove_node(node_id).unwrap(),
ConfChangeType::AddLearnerNode => raft_group.raft.add_learner(node_id).unwrap(),
}
let cs = raft_group.raft.prs().configuration().to_conf_state();
let cs = raft_group.raft.prs().conf().to_conf_state();
store.wl().set_conf_state(cs);
} else {
// For normal proposals, extract the key-value pair and then
Expand Down
26 changes: 13 additions & 13 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2929,7 +2929,7 @@ fn test_restore() {
s.get_metadata().term
);
assert_eq!(
*sm.prs().voter_ids(),
sm.prs().voter_ids().iter().collect::<HashSet<_>>(),
s.get_metadata()
.get_conf_state()
.voters
Expand Down Expand Up @@ -3143,7 +3143,7 @@ fn test_add_node() -> Result<()> {
let mut r = new_test_raft(1, vec![1], 10, 1, new_storage(), &l);
r.add_node(2)?;
assert_eq!(
*r.prs().voter_ids(),
r.prs().voter_ids().iter().collect::<HashSet<_>>(),
vec![1, 2].into_iter().collect::<HashSet<_>>()
);

Expand Down Expand Up @@ -3190,7 +3190,7 @@ fn test_remove_node() -> Result<()> {
let l = default_logger();
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l);
r.remove_node(2)?;
assert_eq!(r.prs().voter_ids().iter().next().unwrap(), &1);
assert_eq!(r.prs().voter_ids().iter().next().unwrap(), 1);
// remove all nodes from cluster
r.remove_node(1)?;
assert!(r.prs().voter_ids().is_empty());
Expand All @@ -3203,7 +3203,7 @@ fn test_remove_node_itself() -> Result<()> {
let l = default_logger().new(o!("test" => "remove_node_itself"));
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l);
n1.remove_node(1)?;
assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), &2);
assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), 2);
assert!(n1.prs().voter_ids().is_empty());
Ok(())
}
Expand Down Expand Up @@ -3235,9 +3235,9 @@ fn test_raft_nodes() {
];
for (i, (ids, wids)) in tests.drain(..).enumerate() {
let r = new_test_raft(1, ids, 10, 1, new_storage(), &l);
let voter_ids = r.prs().voter_ids();
let voter_ids: HashSet<_> = r.prs().voter_ids().iter().collect();
let wids = wids.into_iter().collect::<HashSet<_>>();
if *voter_ids != wids {
if voter_ids != wids {
panic!("#{}: nodes = {:?}, want {:?}", i, voter_ids, wids);
}
}
Expand Down Expand Up @@ -3918,12 +3918,12 @@ fn test_restore_with_learner() {
let conf_state = s.get_metadata().get_conf_state();
for &node in &conf_state.voters {
assert!(sm.prs().get(node).is_some());
assert!(!sm.prs().learner_ids().contains(&node));
assert!(!sm.prs().learner_ids().contains(node));
}

for &node in &conf_state.learners {
assert!(sm.prs().get(node).is_some());
assert!(sm.prs().learner_ids().contains(&node));
assert!(sm.prs().learner_ids().contains(node));
}

assert!(!sm.restore(s));
Expand Down Expand Up @@ -4011,8 +4011,8 @@ fn test_add_learner() -> Result<()> {
let mut n1 = new_test_raft(1, vec![1], 10, 1, new_storage(), &l);
n1.add_learner(2)?;

assert_eq!(*n1.prs().learner_ids().iter().next().unwrap(), 2);
assert!(n1.prs().learner_ids().contains(&2));
assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), 2);
assert!(n1.prs().learner_ids().contains(2));

Ok(())
}
Expand All @@ -4027,11 +4027,11 @@ fn test_add_voter_peer_promotes_self_sets_is_learner() -> Result<()> {
// Node is already voter.
n1.add_learner(1).ok();
assert!(n1.promotable());
assert!(n1.prs().voter_ids().contains(&1));
assert!(n1.prs().voter_ids().contains(1));
n1.remove_node(1)?;
n1.add_learner(1)?;
assert!(!n1.promotable());
assert!(n1.prs().learner_ids().contains(&1));
assert!(n1.prs().learner_ids().contains(1));

Ok(())
}
Expand All @@ -4043,7 +4043,7 @@ fn test_remove_learner() -> Result<()> {
let l = default_logger();
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l);
n1.remove_node(2)?;
assert_eq!(n1.prs().voter_ids().iter().next().unwrap(), &1);
assert_eq!(n1.prs().voter_ids().iter().next().unwrap(), 1);
assert!(n1.prs().learner_ids().is_empty());

n1.remove_node(1)?;
Expand Down
2 changes: 1 addition & 1 deletion harness/tests/integration_cases/test_raft_paper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ fn test_nonleader_start_election(state: StateRole, l: &Logger) {

assert_eq!(r.term, 2);
assert_eq!(r.state, StateRole::Candidate);
assert!(r.votes[&r.id]);
assert!(r.prs().votes()[&r.id]);
let mut msgs = r.read_messages();
msgs.sort_by_key(|m| format!("{:?}", m));
let new_message_ext = |f, to| {
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,11 @@ pub mod util;
pub use self::config::Config;
pub use self::errors::{Error, Result, StorageError};
pub use self::log_unstable::Unstable;
pub use self::quorum::joint::Configuration as JointConfig;
pub use self::quorum::majority::Configuration as MajorityConfig;
pub use self::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, INVALID_INDEX};
pub use self::raft_log::{RaftLog, NO_LIMIT};
pub use self::tracker::{Inflights, Progress, ProgressSet, ProgressState};
pub use self::tracker::{Inflights, Progress, ProgressState, ProgressTracker};

#[allow(deprecated)]
pub use self::raw_node::is_empty_snap;
Expand Down
1 change: 1 addition & 0 deletions src/quorum.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

pub mod joint;
pub mod majority;

use std::collections::HashMap;
Expand Down
83 changes: 83 additions & 0 deletions src/quorum/joint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use super::{AckedIndexer, VoteResult};
use crate::util::Union;
use crate::HashSet;
use crate::MajorityConfig;
use std::cmp;

/// A configuration of two groups of (possibly overlapping) majority configurations.
/// Decisions require the support of both majorities.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Configuration {
pub(crate) incoming: MajorityConfig,
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) outgoing: MajorityConfig,
}

impl Configuration {
/// Creates a new configuration using the given IDs.
pub fn new(voters: HashSet<u64>) -> Configuration {
Configuration {
incoming: MajorityConfig::new(voters),
outgoing: MajorityConfig::default(),
}
}

/// Creates an empty configuration with given capacity.
pub fn with_capacity(cap: usize) -> Configuration {
Configuration {
incoming: MajorityConfig::with_capacity(cap),
outgoing: MajorityConfig::default(),
}
}

/// Returns the largest committed index for the given joint quorum. An index is
/// jointly committed if it is committed in both constituent majorities.
BusyJay marked this conversation as resolved.
Show resolved Hide resolved
///
/// The bool flag indicates whether the index is computed by group commit algorithm
/// successfully. It's true only when both majorities use group commit.
pub fn committed_index(&self, use_group_commit: bool, l: &impl AckedIndexer) -> (u64, bool) {
let (i_idx, i_use_gc) = self.incoming.committed_index(use_group_commit, l);
let (o_idx, o_use_gc) = self.outgoing.committed_index(use_group_commit, l);
(cmp::min(i_idx, o_idx), i_use_gc && o_use_gc)
}

/// Takes a mapping of voters to yes/no (true/false) votes and returns a result
/// indicating whether the vote is pending, lost, or won. A joint quorum requires
/// both majority quorums to vote in favor.
pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
let i = self.incoming.vote_result(&check);
let o = self.outgoing.vote_result(check);
match (i, o) {
// It won if won in both.
(VoteResult::Won, VoteResult::Won) => VoteResult::Won,
// It lost if lost in either.
(VoteResult::Lost, _) | (_, VoteResult::Lost) => VoteResult::Lost,
// It remains pending if pending in both or just won in one side.
_ => VoteResult::Pending,
}
}

/// Clears all IDs.
pub fn clear(&mut self) {
self.incoming.clear();
self.outgoing.clear();
}

/// Returns true if (and only if) there is only one voting member
/// (i.e. the leader) in the current configuration.
pub fn is_singleton(&self) -> bool {
self.outgoing.voters.is_empty() && self.incoming.voters.len() == 1
}

/// Returns an iterator over two hash set without cloning.
pub fn ids(&self) -> Union<'_> {
Union::new(&self.incoming.voters, &self.outgoing.voters)
}

/// Check if an id is a voter.
#[inline]
pub fn contains(&self, id: u64) -> bool {
self.incoming.voters.contains(&id) || self.outgoing.voters.contains(&id)
}
}
9 changes: 7 additions & 2 deletions src/quorum/majority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ impl Configuration {

/// Returns the MajorityConfig as a sorted slice.
pub fn slice(&self) -> Vec<u64> {
let mut voters: Vec<_> = self.voters.iter().cloned().collect();
let mut voters = self.raw_slice();
voters.sort();
voters
}

/// Returns the MajorityConfig as a slice.
pub fn raw_slice(&self) -> Vec<u64> {
self.voters.iter().cloned().collect()
}

/// Computes the committed index from those supplied via the
/// provided AckedIndexer (for the active config).
///
Expand All @@ -43,7 +48,7 @@ impl Configuration {
if self.voters.is_empty() {
// This plays well with joint quorums which, when one half is the zero
// MajorityConfig, should behave like the other half.
return (u64::MAX, false);
return (u64::MAX, true);
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
}

let mut stack_arr: [MaybeUninit<Index>; 7] = unsafe { MaybeUninit::uninit().assume_init() };
Expand Down
Loading