diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index 25df09119..b1ddb8003 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::sync::Arc; use anyhow::anyhow; @@ -28,6 +29,7 @@ use crate::replication::RaftEvent; use crate::AppData; use crate::AppDataResponse; use crate::LogId; +use crate::MessageSummary; use crate::RaftNetwork; use crate::RaftStorage; @@ -52,6 +54,12 @@ impl ClientRequestEntry { } } +impl MessageSummary for ClientRequestEntry { + fn summary(&self) -> String { + format!("entry:{}", self.entry.summary()) + } +} + /// An enum type wrapping either a client response channel or an internal Raft response channel. #[derive(derive_more::From)] pub enum ClientOrInternalResponseTx { @@ -232,7 +240,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } /// Handle client write requests. - #[tracing::instrument(level = "trace", skip(self, rpc, tx))] + #[tracing::instrument(level = "trace", skip(self, tx), fields(rpc=%rpc.summary()))] pub(super) async fn handle_client_write_request( &mut self, rpc: ClientWriteRequest, @@ -275,22 +283,35 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// NOTE WELL: this routine does not wait for the request to actually finish replication, it /// merely beings the process. Once the request is committed to the cluster, its response will /// be generated asynchronously. - #[tracing::instrument(level = "trace", skip(self, req))] + #[tracing::instrument(level = "trace", skip(self, req), fields(req=%req.summary()))] pub(super) async fn replicate_client_request(&mut self, req: ClientRequestEntry) { // Replicate the request if there are other cluster members. The client response will be // returned elsewhere after the entry has been committed to the cluster. let entry_arc = req.entry.clone(); - if self.nodes.is_empty() && self.non_voters.is_empty() { + // TODO(xp): calculate nodes set that need to replicate to, when updating membership + // TODO(xp): Or add to-non-voter replication into self.nodes. + + let all_members = self.core.membership.all_nodes(); + let non_voter_ids = self.non_voters.keys().copied().collect::>(); + + let joint_non_voter_ids = all_members.intersection(&non_voter_ids).collect::>(); + + let nodes = self.nodes.keys().collect::>(); + tracing::debug!(?nodes, ?joint_non_voter_ids, "replicate_client_request"); + + let await_quorum = !self.nodes.is_empty() || !joint_non_voter_ids.is_empty(); + + if await_quorum { + self.awaiting_committed.push(req); + } else { // Else, there are no voting nodes for replication, so the payload is now committed. self.core.commit_index = entry_arc.log_id.index; + tracing::debug!(self.core.commit_index, "update commit index, no need to replicate"); self.leader_report_metrics(); self.client_request_post_commit(req).await; - return; } - self.awaiting_committed.push(req); - if !self.nodes.is_empty() { for node in self.nodes.values() { let _ = node.replstream.repl_tx.send(( diff --git a/async-raft/tests/replication_1_voter_to_isolated_non_voter.rs b/async-raft/tests/replication_1_voter_to_isolated_non_voter.rs new file mode 100644 index 000000000..2d48507e8 --- /dev/null +++ b/async-raft/tests/replication_1_voter_to_isolated_non_voter.rs @@ -0,0 +1,56 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use async_raft::Config; +use fixtures::RaftRouter; +use maplit::btreeset; + +#[macro_use] +mod fixtures; + +/// Test replication to non-voter that is not in membership should not block. +/// +/// What does this test do? +/// +/// - bring on a cluster of 1 voter and 1 non-voter. +/// - isolate replication to node 1. +/// - client write should not be blocked. +/// +/// export RUST_LOG=async_raft,memstore,replication_1_voter_to_isolated_non_voter=trace +/// cargo test -p async-raft --test replication_1_voter_to_isolated_non_voter +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn replication_1_voter_to_isolated_non_voter() -> Result<()> { + let (_log_guard, ut_span) = init_ut!(); + let _ent = ut_span.enter(); + + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); + let router = Arc::new(RaftRouter::new(config.clone())); + + let mut n_logs = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?; + + tracing::info!("--- stop replication to node 1"); + { + router.isolate_node(1).await; + + router.client_request_many(0, "0", (10 - n_logs) as usize).await; + n_logs = 10; + + router.wait_for_log(&btreeset![0], n_logs, timeout(), "send log to trigger snapshot").await?; + } + + tracing::info!("--- restore replication to node 1"); + { + router.restore_node(1).await; + + router.client_request_many(0, "0", (10 - n_logs) as usize).await; + n_logs = 10; + + router.wait_for_log(&btreeset![0], n_logs, timeout(), "send log to trigger snapshot").await?; + } + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(5000)) +}