diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 0a4f1b397..a88237b82 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -29,6 +29,9 @@ impl, S: RaftStorage> Ra ) -> RaftResult { tracing::debug!(%self.last_log_id); + let mut msg_entries = msg.entries.as_slice(); + let mut prev_log_id = msg.prev_log_id; + // If message's term is less than most recent term, then we do not honor the request. if msg.term < self.current_term { tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term"); @@ -39,25 +42,45 @@ impl, S: RaftStorage> Ra }); } - // Update election timeout. - // TODO(xp): only update commit_index if the log present. e.g., append entries first, then update commit_index. self.update_next_election_timeout(true); - let mut report_metrics = false; - // The value for `self.commit_index` is only updated here when not the leader. - self.commit_index = msg.leader_commit; + // Caveat: Because we can not just delete `log[prev_log_id.index..]`, (which results in loss of committed + // entry), the commit index must be update only after append-entries + // and must point to a log entry that is consistent to leader. + // Or there would be chance applying an uncommitted entry: + // + // ``` + // R0 1,1 1,2 3,3 + // R1 1,1 1,2 2,3 + // R2 1,1 1,2 3,3 + // ``` + // + // - R0 to R1 append_entries: entries=[{1,2}], prev_log_id = {1,1}, commit_index = 3 + // - R1 accepted this append_entries request but was not aware of that entry {2,3} is inconsistent to leader. + // Then it will update commit_index to 3 and apply {2,3} + + let valid_commit_index = msg_entries.last().map(|x| x.log_id.index).unwrap_or(prev_log_id.index); + let valid_commit_index = std::cmp::min(msg.leader_commit, valid_commit_index); + + tracing::debug!("start to check and update to latest term/leader"); + { + let mut report_metrics = false; + + if msg.term > self.current_term { + self.update_current_term(msg.term, None); + self.save_hard_state().await?; + report_metrics = true; + } - // Update current term if needed. - if self.current_term != msg.term { - self.update_current_term(msg.term, None); - self.save_hard_state().await?; - report_metrics = true; - } + // Update current leader if needed. + if self.current_leader.as_ref() != Some(&msg.leader_id) { + self.update_current_leader(UpdateCurrentLeader::OtherNode(msg.leader_id)); + report_metrics = true; + } - // Update current leader if needed. - if self.current_leader.as_ref() != Some(&msg.leader_id) { - self.update_current_leader(UpdateCurrentLeader::OtherNode(msg.leader_id)); - report_metrics = true; + if report_metrics { + self.report_metrics(Update::Ignore); + } } // Transition to follower state if needed. @@ -65,37 +88,15 @@ impl, S: RaftStorage> Ra self.set_target_state(State::Follower); } - // If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local - // log info, then replication is g2g. - let msg_prev_index_is_min = msg.prev_log_id.index == u64::MIN; - let msg_index_and_term_match = msg.prev_log_id == self.last_log_id; - - if msg_prev_index_is_min || msg_index_and_term_match { - if !msg.entries.is_empty() { - self.append_log_entries(&msg.entries).await?; - } - self.replicate_to_state_machine_if_needed().await?; - - if report_metrics { - self.report_metrics(Update::Ignore); - } - - return Ok(AppendEntriesResponse { - term: self.current_term, - success: true, - conflict_opt: None, - }); + if prev_log_id.index == u64::MIN || prev_log_id == self.last_log_id { + // Matches! Great! + return self.append_apply_log_entries(msg_entries, valid_commit_index).await; } - ///////////////////////////////////// - //// Begin Log Consistency Check //// tracing::debug!("begin log consistency check"); - if self.last_log_id.index < msg.prev_log_id.index { - if report_metrics { - self.report_metrics(Update::Ignore); - } - + // Lagging too much, let the leader to retry append_entries from my last_log.index + if self.last_log_id.index < prev_log_id.index { return Ok(AppendEntriesResponse { term: self.current_term, success: false, @@ -105,93 +106,170 @@ impl, S: RaftStorage> Ra }); } - // last_log_id.index >= prev_log_id.index + // prev_log_id.index <= last_log_id.index - // Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its - // result. - let prev_entry = self - .storage - .try_get_log_entry(msg.prev_log_id.index) - .await - .map_err(|err| self.map_fatal_storage_error(err))?; + // Log entries upto last_applied may be removed. + // The applied entries are also committed thus always be consistent with the leader. + // Align the prev_log_id to last_applied. + let local_prev_log_id = self.earliest_log_id_since(prev_log_id.index).await?; - let target_entry = match prev_entry { - Some(target_entry) => target_entry, - None => { - // This can only happen if the target entry is removed, e.g., when installing snapshot or log - // compaction. - // Use the last known index & term as a conflict opt. + if prev_log_id.index < local_prev_log_id.index { + let distance = local_prev_log_id.index - prev_log_id.index; - if report_metrics { - self.report_metrics(Update::Ignore); - } + prev_log_id = local_prev_log_id; - return Ok(AppendEntriesResponse { - term: self.current_term, - success: false, - conflict_opt: Some(ConflictOpt { - log_id: self.last_log_id, - }), - }); - } - }; + msg_entries = if msg_entries.len() > distance as usize { + &msg_entries[distance as usize..] + } else { + &[] + }; + } + + // last_applied.index <= prev_log_id.index <= last_log_id.index // The target entry was found. Compare its term with target term to ensure everything is consistent. - if target_entry.log_id.term == msg.prev_log_id.term { + if local_prev_log_id == prev_log_id { // We've found a point of agreement with the leader. If we have any logs present // with an index greater than this, then we must delete them per ยง5.3. - if self.last_log_id.index > target_entry.log_id.index { + + if self.last_log_id.index > prev_log_id.index { + self.delete_inconsistent_log(prev_log_id, msg_entries).await?; + } + tracing::debug!("end log consistency check"); + + return self.append_apply_log_entries(msg_entries, valid_commit_index).await; + } + + let last_match = self.last_possible_matched(prev_log_id).await?; + + Ok(AppendEntriesResponse { + term: self.current_term, + success: false, + conflict_opt: Some(ConflictOpt { log_id: last_match }), + }) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn earliest_log_id_since(&mut self, index: u64) -> RaftResult { + if index <= self.last_applied.index { + return Ok(self.last_applied); + } + + // last_applied.index < prev_log_id.index <= last_log_id.index + + let x = self.storage.get_log_entries(index..=index).await.map_err(|err| self.map_fatal_storage_error(err))?; + + let entry = x + .first() + .ok_or_else(|| self.map_fatal_storage_error(anyhow::anyhow!("log entry not found at: {}", index)))?; + + Ok(entry.log_id) + } + + #[tracing::instrument(level="debug", skip(self, msg_entries), fields(msg_entries=%msg_entries.summary()))] + async fn delete_inconsistent_log(&mut self, prev_log_id: LogId, msg_entries: &[Entry]) -> RaftResult<()> { + // Caveat: Deleting then appending entries are not atomic, thus deleting consistent entries may cause loss of + // committed logs. + // + // E.g., the logs are as following and R1 now is the leader: + // + // ``` + // R1 1,1 1,2 1,3 + // R2 1,1 1,2 + // R3 + // ``` + // When the following steps take place, committed entry `{1,2}` is lost: + // + // - R1 to R2: append_entries(entries=[{1,2}, {1,3}], prev_log_id={1,1}) + // - R2 deletes 1,2 + // - R2 crash + // - R2 elected as leader and only see 1,1; the committed entry 1,2 is lost. + // + // **The safe way is to skip every entry that present in append_entries message then delete only the + // inconsistent entries**. + + let end = std::cmp::min(prev_log_id.index + msg_entries.len() as u64, self.last_log_id.index + 1); + + tracing::debug!( + "find and delete inconsistent log entries [{}, {}), last_log_id: {}, entries: {}", + prev_log_id, + end, + self.last_log_id, + msg_entries.summary() + ); + + let entries = self + .storage + .get_log_entries(prev_log_id.index..end) + .await + .map_err(|err| self.map_fatal_storage_error(err))?; + + for (i, ent) in entries.iter().enumerate() { + if ent.log_id.term != msg_entries[i].log_id.term { + tracing::debug!("delete inconsistent log entries from: {}", ent.log_id); + self.storage - .delete_logs_from(target_entry.log_id.index + 1..) + .delete_logs_from(ent.log_id.index..) .await .map_err(|err| self.map_fatal_storage_error(err))?; + let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?; + self.update_membership(membership)?; + + break; } } - // The target entry does not have the same term. Fetch the last 50 logs, and use the last - // entry of that payload which is still in the target term for conflict optimization. - else { - let mut start = if msg.prev_log_id.index >= 50 { - msg.prev_log_id.index - 50 - } else { - 0 - }; - if start == 0 { - start = 1 - } - let old_entries = self - .storage - .get_log_entries(start..msg.prev_log_id.index) - .await - .map_err(|err| self.map_fatal_storage_error(err))?; + Ok(()) + } - let first = old_entries.first().map(|x| x.log_id).unwrap_or_default(); + /// Walks backward 50 entries to find the last log entry that has the same `term` as `prev_log_id`, which is + /// consistent to the leader. Further replication will use this log id as `prev_log_id` to sync log. + #[tracing::instrument(level = "debug", skip(self))] + async fn last_possible_matched(&mut self, prev_log_id: LogId) -> RaftResult { + let start = prev_log_id.index.saturating_sub(50); - let opt = match old_entries.iter().rev().find(|entry| entry.log_id.term <= msg.prev_log_id.term) { - Some(entry) => Some(ConflictOpt { log_id: entry.log_id }), - None => Some(ConflictOpt { log_id: first }), - }; - if report_metrics { - self.report_metrics(Update::Ignore); - } - return Ok(AppendEntriesResponse { - term: self.current_term, - success: false, - conflict_opt: opt, - }); + if start < self.last_applied.index { + // The applied log is always consistent to leader. + return Ok(self.last_applied); } - /////////////////////////////////// - //// End Log Consistency Check //// - tracing::debug!("end log consistency check"); + if start == 0 { + // A simple way is to sync from the beginning. + return Ok(LogId { term: 0, index: 0 }); + } - self.append_log_entries(&msg.entries).await?; - self.replicate_to_state_machine_if_needed().await?; - if report_metrics { - self.report_metrics(Update::Ignore); + let old_entries = self + .storage + .get_log_entries(start..=prev_log_id.index) + .await + .map_err(|err| self.map_fatal_storage_error(err))?; + + let last_matched = old_entries.iter().rev().find(|entry| entry.log_id.term == prev_log_id.term); + + let first = old_entries.first().map(|x| x.log_id).unwrap(); + + let last_matched = last_matched.map(|x| x.log_id).unwrap_or_else(|| first); + Ok(last_matched) + } + + #[tracing::instrument(level="debug", skip(self, entries), fields(entries=%entries.summary()))] + async fn append_apply_log_entries( + &mut self, + entries: &[Entry], + commit_index: u64, + ) -> RaftResult { + if !entries.is_empty() { + self.append_log_entries(entries).await?; } + + self.commit_index = commit_index; + + self.replicate_to_state_machine_if_needed().await?; + + self.report_metrics(Update::Ignore); + Ok(AppendEntriesResponse { term: self.current_term, success: true, diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index abc32056c..3a7db9504 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -520,6 +520,24 @@ impl Entry { } } +impl MessageSummary for Entry { + fn summary(&self) -> String { + format!("{}:{}", self.log_id, self.payload.summary()) + } +} + +impl MessageSummary for &[Entry] { + fn summary(&self) -> String { + let mut res = Vec::with_capacity(self.len()); + for x in self.iter() { + let e = format!("{}:{}", x.log_id, x.payload.summary()); + res.push(e); + } + + res.join(",") + } +} + /// Log entry payload variants. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum EntryPayload { diff --git a/async-raft/tests/snapshot_overrides_membership.rs b/async-raft/tests/snapshot_overrides_membership.rs index b6995030d..19f011d89 100644 --- a/async-raft/tests/snapshot_overrides_membership.rs +++ b/async-raft/tests/snapshot_overrides_membership.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use anyhow::Result; use async_raft::raft::AppendEntriesRequest; @@ -49,12 +50,12 @@ async fn snapshot_overrides_membership() -> Result<()> { { router.new_raft_node(0).await; - router.wait_for_log(&btreeset![0], want, None, "empty").await?; - router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?; + router.wait_for_log(&btreeset![0], want, timeout(), "empty").await?; + router.wait_for_state(&btreeset![0], State::NonVoter, timeout(), "empty").await?; router.initialize_from_single_node(0).await?; want += 1; - router.wait_for_log(&btreeset![0], want, None, "init leader").await?; + router.wait_for_log(&btreeset![0], want, timeout(), "init leader").await?; router.assert_stable_cluster(Some(1), Some(want)).await; } @@ -63,10 +64,12 @@ async fn snapshot_overrides_membership() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; want = snapshot_threshold; - router.wait_for_log(&btreeset![0], want, None, "send log to trigger snapshot").await?; + router.wait_for_log(&btreeset![0], want, timeout(), "send log to trigger snapshot").await?; router.assert_stable_cluster(Some(1), Some(want)).await; - router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; + router + .wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, timeout(), "snapshot") + .await?; router .assert_storage_state( 1, @@ -123,12 +126,12 @@ async fn snapshot_overrides_membership() -> Result<()> { { router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); - router.wait_for_log(&btreeset![0, 1], want, None, "add non-voter").await?; + router.wait_for_log(&btreeset![0, 1], want, timeout(), "add non-voter").await?; let expected_snap = Some((want.into(), 1, MembershipConfig { members: btreeset![0u64], members_after_consensus: None, })); - router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, None, "").await?; + router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, timeout(), "").await?; router .assert_storage_state( 1, @@ -153,3 +156,7 @@ async fn snapshot_overrides_membership() -> Result<()> { Ok(()) } + +fn timeout() -> Option { + Some(Duration::from_millis(5000)) +}