Skip to content

Commit

Permalink
refactor: explain membership
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Dec 29, 2021
1 parent 3511e43 commit c3139d3
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 25 deletions.
27 changes: 13 additions & 14 deletions async-raft/src/core/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,26 +157,25 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// Spawn parallel vote requests to all cluster members.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) fn spawn_parallel_vote_requests(&self) -> mpsc::Receiver<(VoteResponse, NodeId)> {
let all_members = self.core.effective_membership.membership.all_nodes().clone();
let (tx, rx) = mpsc::channel(all_members.len());
for member in all_members.into_iter().filter(|member| member != &self.core.id) {
let rpc = VoteRequest::new(
self.core.current_term,
self.core.id,
self.core.last_log_id.index,
self.core.last_log_id.term,
);
let all_nodes = self.core.effective_membership.membership.all_nodes().clone();
let (tx, rx) = mpsc::channel(all_nodes.len());

for member in all_nodes.into_iter().filter(|member| member != &self.core.id) {
let rpc = VoteRequest::new(self.core.current_term, self.core.id, self.core.last_log_id);

let (network, tx_inner) = (self.core.network.clone(), tx.clone());
let _ = tokio::spawn(
async move {
match network.send_vote(member, rpc).await {
Ok(res) => {
let _ = tx_inner.send((res, member)).await;
let res = network.send_vote(member, rpc).await;

match res {
Ok(vote_resp) => {
let _ = tx_inner.send((vote_resp, member)).await;
}
Err(err) => tracing::error!({error=%err, peer=member}, "error while requesting vote from peer"),
Err(err) => tracing::error!({error=%err, target=member}, "while requesting vote"),
}
}
.instrument(tracing::debug_span!("requesting vote from peer", target = member)),
.instrument(tracing::debug_span!("send_vote_req", target = member)),
);
}
rx
Expand Down
68 changes: 57 additions & 11 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,61 @@ impl<D: AppData> MessageSummary for EntryPayload<D> {
//////////////////////////////////////////////////////////////////////////////////////////////////

/// The membership configuration of the cluster.
/// Unlike original raft, the membership always a joint.
/// It could be a joint of one, two or more members, i.e., a quorum requires a majority of every members
///
/// It could be a joint of one, two or more members list, i.e., a quorum requires a majority of every members.
///
/// The structure of membership is actually a log:
/// ```text
/// 2-3: [6,7,8]
/// 1-2: [3,4,5]
/// 1-1: [1,2,3]
/// ```
///
/// Without any limitation, a node uses the **joint** of every config
/// as the effective quorum.
///
/// But **raft** tries to eliminate the items in a membership config to at most 2, e.g.:
/// single item config is the normal majority quorum,
/// double items config is the raft joint membership config.
///
/// To achieve this, raft has to guarantee that a 2-entries config contains all valid quorum:
/// E.g.: given the current config of node p and q as the following:
///
/// Node p:
/// ```text
/// A-B: [a,b,c]
/// 1-2: [3,4,5] <- commit_index
/// 1-1: [1,2,3]
/// ```
///
/// Node q:
/// ```text
/// X-Y: [x,y,z]
/// 1-1: [1,2,3] <- commit_index
/// ```
///
/// ```text
/// A-B <- p
/// |
/// 1-2 X-Y <- q
/// | /
/// 1-1
/// ```
///
/// If we knows about which log entry is committed,
/// the effective membership can be reduced to the joint of the last committed membership and all uncommitted
/// memberships, because:
///
/// - Two nodes has equal greatest committed membership always include the last committed membership in the joint config
/// so that they won't both become a leader.
///
/// - A node has smaller committed membership will see a higher log thus it won't be a new leader, such as q.
///
/// This way, to keep at most 2 member list in the membership config:
///
/// - raft does not allow two uncommitted membership in its log,
/// - and stores the last committed membership and the newly proposed membership in on log entry(because raft does not
/// store committed index), which is the joint membership entry.
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Membership {
/// Multi configs.
Expand Down Expand Up @@ -665,8 +718,6 @@ impl Membership {
}

/// Check if the given NodeId exists in this membership config.
///
/// When in joint consensus, this will check both config groups.
pub fn contains(&self, x: &NodeId) -> bool {
for c in self.configs.iter() {
if c.contains(x) {
Expand Down Expand Up @@ -768,7 +819,6 @@ pub struct VoteRequest {
/// The candidate's current term.
pub term: u64,

/// The candidate's ID.
pub candidate_id: u64,

pub last_log_id: LogId,
Expand All @@ -781,15 +831,11 @@ impl MessageSummary for VoteRequest {
}

impl VoteRequest {
/// Create a new instance.
pub fn new(term: u64, candidate_id: u64, last_log_index: u64, last_log_term: u64) -> Self {
pub fn new(term: u64, candidate_id: u64, last_log_id: LogId) -> Self {
Self {
term,
candidate_id,
last_log_id: LogId {
term: last_log_term,
index: last_log_index,
},
last_log_id,
}
}
}
Expand Down

0 comments on commit c3139d3

Please sign in to comment.