diff --git a/openraft/src/core/admin.rs b/openraft/src/core/admin.rs index 61773c16c..c34e71ad5 100644 --- a/openraft/src/core/admin.rs +++ b/openraft/src/core/admin.rs @@ -246,7 +246,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// /// This is ony called by leader. #[tracing::instrument(level = "debug", skip(self))] - pub(super) fn handle_uniform_consensus_committed(&mut self, log_id: &LogId) { + pub(super) async fn handle_uniform_consensus_committed(&mut self, log_id: &LogId) -> Result<(), StorageError> { tracing::info!("handle_uniform_consensus_committed at log id: {}", log_id); let index = log_id.index; @@ -257,24 +257,32 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // TODO(xp): transfer leadership self.core.set_target_state(State::Learner); self.core.current_leader = None; - return; + return Ok(()); } let membership = &self.core.effective_membership.membership; - for (id, state) in self.nodes.iter_mut() { - if membership.contains(id) { - continue; - } + let (_, committed_membership) = self.core.storage.last_applied_state().await?; + + if let Some(prev) = committed_membership { + let prev_x = prev.membership.all_nodes().clone(); + let curr = membership.all_nodes(); - tracing::info!( - "set remove_after_commit for {} = {}, membership: {:?}", - id, - index, - self.core.effective_membership - ); + let removed = prev_x.difference(curr); + for id in removed { + if let Some(state) = self.nodes.get_mut(id) { + tracing::info!( + "set remove_after_commit for {} = {}, membership: {:?}", + id, + index, + self.core.effective_membership + ); - state.remove_since = Some(index) + state.remove_since = Some(index) + } else { + tracing::warn!("replication not found to target: {}", id) + } + } } let targets = self.nodes.keys().cloned().collect::>(); @@ -283,6 +291,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } self.leader_report_metrics(); + Ok(()) } /// Remove a replication if the membership that does not include it has committed. diff --git a/openraft/src/core/client.rs b/openraft/src/core/client.rs index 23269faf8..4be6e3022 100644 --- a/openraft/src/core/client.rs +++ b/openraft/src/core/client.rs @@ -270,24 +270,26 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage ); } - pub fn handle_special_log(&mut self, entry: &Entry) { + pub async fn handle_special_log(&mut self, entry: &Entry) -> Result<(), StorageError> { match &entry.payload { EntryPayload::Membership(ref m) => { if m.is_in_joint_consensus() { // nothing to do } else { - self.handle_uniform_consensus_committed(&entry.log_id); + self.handle_uniform_consensus_committed(&entry.log_id).await?; } } EntryPayload::Blank => {} EntryPayload::Normal(_) => {} } + + Ok(()) } /// Apply the given log entry to the state machine. #[tracing::instrument(level = "debug", skip(self, entry))] pub(super) async fn apply_entry_to_state_machine(&mut self, entry: &Entry) -> Result { - self.handle_special_log(entry); + self.handle_special_log(entry).await?; // First, we just ensure that we apply any outstanding up to, but not including, the index // of the given entry. We need to be able to return the data response from applying this diff --git a/openraft/tests/membership/main.rs b/openraft/tests/membership/main.rs index e3bf9dc7b..fcf4c78f1 100644 --- a/openraft/tests/membership/main.rs +++ b/openraft/tests/membership/main.rs @@ -10,6 +10,7 @@ mod t10_add_learner; mod t10_remove_learner; mod t15_add_remove_follower; mod t20_change_membership; +mod t21_change_membership_keep_learner; mod t25_elect_with_new_config; mod t30_commit_joint_config; mod t30_step_down; diff --git a/openraft/tests/membership/t21_change_membership_keep_learner.rs b/openraft/tests/membership/t21_change_membership_keep_learner.rs new file mode 100644 index 000000000..92dbbefb1 --- /dev/null +++ b/openraft/tests/membership/t21_change_membership_keep_learner.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; +use std::time::Duration; + +use maplit::btreeset; +use openraft::Config; + +use crate::fixtures::RaftRouter; + +/// Given a cluster of voter {0,1,2} and learners {3,4,5}; +/// Changing membership to {0,3,4} should not remove replication to node-5, should only remove replication to {1,2} +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn change_membership_keep_learners() -> anyhow::Result<()> { + let (_log_guard, ut_span) = init_ut!(); + let _ent = ut_span.enter(); + + let lag_threshold = 1; + + let config = Arc::new( + Config { + replication_lag_threshold: lag_threshold, + ..Default::default() + } + .validate()?, + ); + let router = Arc::new(RaftRouter::new(config.clone())); + + let mut log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {3,4,5}).await?; + + tracing::info!("--- change membership to: 0,3,4"); + router.change_membership(0, btreeset! {0,3,4}).await?; + log_index += 2; + + tracing::info!("--- write 5 logs"); + { + router.client_request_many(0, "foo", 5).await; + log_index += 5; + + for id in [1, 2] { + assert!(router + .wait(&id, timeout()) + .await? + .log(Some(log_index), "removed voters can not receive logs") + .await + .is_err()); + } + + for id in [0, 3, 4, 5] { + router + .wait(&id, timeout()) + .await? + .log(Some(log_index), "other voters and learners receive all logs") + .await?; + } + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(500)) +}