From adc24f55d75d9c7c01fcd0f4f9e35dd5aae679aa Mon Sep 17 00:00:00 2001 From: drdr xp Date: Mon, 16 Aug 2021 14:22:58 +0800 Subject: [PATCH] change: pass all logs to apply_entry_to_state_machine(), not just Normal logs. Pass `Entry` to `apply_entry_to_state_machine()`, not just the only `EntryPayload::Normal(normal_log)`. Thus the state machine is able to save the membership changes if it prefers to. Why: In practice, a snapshot contains info about all applied logs, including the membership config log. Before this change, the state machine does not receive any membership log thus when making a snapshot, one needs to walk through all applied logs to get the last membership that is included in state machine. By letting the state machine remember the membership log applied, the snapshto creation becomes more convinient and intuitive: it does not need to scan the applied logs any more. --- async-raft/src/core/append_entries.rs | 15 +-- async-raft/src/core/client.rs | 49 +++++----- async-raft/src/storage.rs | 9 +- .../tests/state_machien_apply_membership.rs | 94 +++++++++++++++++++ async-raft/tests/stepdown.rs | 2 +- memstore/src/lib.rs | 72 +++++++++----- memstore/src/test.rs | 32 +++++-- 7 files changed, 202 insertions(+), 71 deletions(-) create mode 100644 async-raft/tests/state_machien_apply_membership.rs diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 19079766e..581364e77 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -225,10 +225,7 @@ impl, S: RaftStorage> Ra .filter_map(|idx| { if let Some(entry) = self.entries_cache.remove(&idx) { last_entry_seen = Some(entry.log_id); - match entry.payload { - EntryPayload::Normal(inner) => Some((entry.log_id, inner.data)), - _ => None, - } + Some(entry) } else { None } @@ -251,7 +248,7 @@ impl, S: RaftStorage> Ra let handle = tokio::spawn(async move { // Create a new vector of references to the entries data ... might have to change this // interface a bit before 1.0. - let entries_refs: Vec<_> = entries.iter().map(|(k, v)| (k, v)).collect(); + let entries_refs: Vec<_> = entries.iter().collect(); storage.replicate_to_state_machine(&entries_refs).await?; Ok(last_entry_seen) }); @@ -280,13 +277,7 @@ impl, S: RaftStorage> Ra if let Some(entry) = entries.last() { new_last_applied = Some(entry.log_id); } - let data_entries: Vec<_> = entries - .iter() - .filter_map(|entry| match &entry.payload { - EntryPayload::Normal(inner) => Some((&entry.log_id, &inner.data)), - _ => None, - }) - .collect(); + let data_entries: Vec<_> = entries.iter().collect(); if data_entries.is_empty() { return Ok(new_last_applied); } diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index 0a37cb360..bde40eb99 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -307,22 +307,22 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Handle the post-commit logic for a client request. #[tracing::instrument(level = "trace", skip(self, req))] pub(super) async fn client_request_post_commit(&mut self, req: ClientRequestEntry) { + let entry = &req.entry; + match req.tx { ClientOrInternalResponseTx::Client(tx) => { - match &req.entry.payload { - EntryPayload::Normal(inner) => { - match self.apply_entry_to_state_machine(&req.entry.log_id, &inner.data).await { - Ok(data) => { - let _ = tx.send(Ok(ClientWriteResponse { - index: req.entry.log_id.index, - data, - })); - } - Err(err) => { - let _ = tx.send(Err(ClientWriteError::RaftError(err))); - } + match &entry.payload { + EntryPayload::Normal(_) => match self.apply_entry_to_state_machine(&entry).await { + Ok(data) => { + let _ = tx.send(Ok(ClientWriteResponse { + index: req.entry.log_id.index, + data, + })); } - } + Err(err) => { + let _ = tx.send(Err(ClientWriteError::RaftError(err))); + } + }, _ => { // Why is this a bug, and why are we shutting down? This is because we can not easily // encode these constraints in the type system, and client requests should be the only @@ -334,9 +334,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } ClientOrInternalResponseTx::Internal(tx) => { - self.core.last_applied = req.entry.log_id; + // TODO(xp): copied from above, need refactor. + let res = match self.apply_entry_to_state_machine(&entry).await { + Ok(_data) => Ok(entry.log_id.index), + Err(err) => Err(err), + }; + + self.core.last_applied = entry.log_id; self.leader_report_metrics(); - let _ = tx.send(Ok(req.entry.log_id.index)); + let _ = tx.send(res); } } @@ -346,13 +352,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Apply the given log entry to the state machine. #[tracing::instrument(level = "trace", skip(self, entry))] - pub(super) async fn apply_entry_to_state_machine(&mut self, log_id: &LogId, entry: &D) -> RaftResult { + pub(super) async fn apply_entry_to_state_machine(&mut self, entry: &Entry) -> RaftResult { // First, we just ensure that we apply any outstanding up to, but not including, the index // of the given entry. We need to be able to return the data response from applying this // entry to the state machine. // // Note that this would only ever happen if a node had unapplied logs from before becoming leader. + let log_id = &entry.log_id; let index = log_id.index; let expected_next_index = self.core.last_applied.index + 1; @@ -368,13 +375,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage self.core.last_applied = entry.log_id; } - let data_entries: Vec<_> = entries - .iter() - .filter_map(|entry| match &entry.payload { - EntryPayload::Normal(inner) => Some((&entry.log_id, &inner.data)), - _ => None, - }) - .collect(); + let data_entries: Vec<_> = entries.iter().collect(); if !data_entries.is_empty() { self.core .storage @@ -393,7 +394,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } } // Apply this entry to the state machine and return its data response. - let res = self.core.storage.apply_entry_to_state_machine(&log_id, entry).await.map_err(|err| { + let res = self.core.storage.apply_entry_to_state_machine(entry).await.map_err(|err| { if err.downcast_ref::().is_some() { // If this is an instance of the storage impl's shutdown error, then trigger shutdown. self.core.map_fatal_storage_error(err) diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index 5a21e8958..97e57cb53 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -178,6 +178,11 @@ where /// specific transaction is being started, or perhaps committed. This may be where a key/value /// is being stored. This may be where an entry is being appended to an immutable log. /// + /// An impl should do: + /// - Deal with the EntryPayload::Normal() log, which is business logic log. + /// - Optionally, deal with EntryPayload::ConfigChange or EntryPayload::SnapshotPointer log if they are concerned. + /// E.g. when an impl need to track the membership changing. + /// /// Error handling for this method is note worthy. If an error is returned from a call to this /// method, the error will be inspected, and if the error is an instance of /// `RaftStorage::ShutdownError`, then Raft will go into shutdown in order to preserve the @@ -186,7 +191,7 @@ where /// /// It is important to note that even in cases where an application specific error is returned, /// implementations should still record that the entry has been applied to the state machine. - async fn apply_entry_to_state_machine(&self, index: &LogId, data: &D) -> Result; + async fn apply_entry_to_state_machine(&self, data: &Entry) -> Result; /// Apply the given payload of entries to the state machine, as part of replication. /// @@ -194,7 +199,7 @@ where /// have been replicated to a majority of the cluster, will be applied to the state machine. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn replicate_to_state_machine(&self, entries: &[(&LogId, &D)]) -> Result<()>; + async fn replicate_to_state_machine(&self, entries: &[&Entry]) -> Result<()>; /// Perform log compaction, returning a handle to the generated snapshot. /// diff --git a/async-raft/tests/state_machien_apply_membership.rs b/async-raft/tests/state_machien_apply_membership.rs new file mode 100644 index 000000000..b016f3775 --- /dev/null +++ b/async-raft/tests/state_machien_apply_membership.rs @@ -0,0 +1,94 @@ +mod fixtures; + +use std::sync::Arc; + +use anyhow::Result; +use async_raft::raft::MembershipConfig; +use async_raft::Config; +use async_raft::State; +use fixtures::RaftRouter; +use futures::stream::StreamExt; +use maplit::hashset; + +/// All log should be applied to state machine. +/// +/// What does this test do? +/// +/// - bring a cluster with 3 voter and 2 non-voter. +/// - check last_membership in state machine. +/// +/// RUST_LOG=async_raft,memstore,state_machine_apply_membership=trace cargo test -p async-raft --test +/// state_machine_apply_membership +#[tokio::test(flavor = "multi_thread", worker_threads = 6)] +async fn state_machine_apply_membership() -> Result<()> { + fixtures::init_tracing(); + + // Setup test dependencies. + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); + let router = Arc::new(RaftRouter::new(config.clone())); + router.new_raft_node(0).await; + + let mut want = 0; + + // Assert all nodes are in non-voter state & have no entries. + router.wait_for_log(&hashset![0], want, None, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?; + router.assert_pristine_cluster().await; + + // Initialize the cluster, then assert that a stable cluster was formed & held. + tracing::info!("--- initializing cluster"); + router.initialize_from_single_node(0).await?; + want += 1; + + router.wait_for_log(&hashset![0], want, None, "init").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; + + for i in 0..=0 { + let sto = router.get_storage_handle(&i).await?; + let sm = sto.get_state_machine().await; + assert_eq!( + Some(MembershipConfig { + members: hashset![0], + members_after_consensus: None + }), + sm.last_membership + ); + } + + // Sync some new nodes. + router.new_raft_node(1).await; + router.new_raft_node(2).await; + router.new_raft_node(3).await; + router.new_raft_node(4).await; + + tracing::info!("--- adding new nodes to cluster"); + let mut new_nodes = futures::stream::FuturesUnordered::new(); + new_nodes.push(router.add_non_voter(0, 1)); + new_nodes.push(router.add_non_voter(0, 2)); + new_nodes.push(router.add_non_voter(0, 3)); + new_nodes.push(router.add_non_voter(0, 4)); + while let Some(inner) = new_nodes.next().await { + inner?; + } + + tracing::info!("--- changing cluster config"); + router.change_membership(0, hashset![0, 1, 2]).await?; + want += 2; + + router.wait_for_log(&hashset![0, 1, 2, 3, 4], want, None, "cluster of 5 candidates").await?; + + tracing::info!("--- check applied membership config"); + for i in 0..5 { + let sto = router.get_storage_handle(&i).await?; + let sm = sto.get_state_machine().await; + assert_eq!( + Some(MembershipConfig { + members: hashset![0, 1, 2], + members_after_consensus: None + }), + sm.last_membership + ); + } + + Ok(()) +} diff --git a/async-raft/tests/stepdown.rs b/async-raft/tests/stepdown.rs index 8fb5be71c..556b4553e 100644 --- a/async-raft/tests/stepdown.rs +++ b/async-raft/tests/stepdown.rs @@ -128,7 +128,7 @@ async fn stepdown() -> Result<()> { assert!(metrics.current_term >= 2, "term incr when leader changes"); router.assert_stable_cluster(Some(metrics.current_term), Some(want)).await; router - .assert_storage_state(metrics.current_term, want, None, LogId { term: 0, index: 0 }, None) + .assert_storage_state(metrics.current_term, want, None, LogId { term: 2, index: 4 }, None) .await; // ----------------------------------- ^^^ this is `0` instead of `4` because blank payloads from new leaders // and config change entries are never applied to the state machine. diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 5c7f64ec8..bc5e25e1f 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -74,6 +74,9 @@ pub struct MemStoreSnapshot { #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct MemStoreStateMachine { pub last_applied_log: LogId, + + pub last_membership: Option, + /// A mapping of client IDs to their state info. pub client_serial_responses: HashMap)>, /// The current status of a client by ID. @@ -233,7 +236,7 @@ impl RaftStorage for MemStore { async fn get_log_entries(&self, start: u64, stop: u64) -> Result>> { // Invalid request, return empty vec. if start > stop { - tracing::error!("invalid request, start > stop"); + tracing::error!("get_log_entries: invalid request, start({}) > stop({})", start, stop); return Ok(vec![]); } let log = self.log.read().await; @@ -243,7 +246,7 @@ impl RaftStorage for MemStore { #[tracing::instrument(level = "trace", skip(self))] async fn delete_logs_from(&self, start: u64, stop: Option) -> Result<()> { if stop.as_ref().map(|stop| &start > stop).unwrap_or(false) { - tracing::error!("invalid request, start > stop"); + tracing::error!("delete_logs_from: invalid request, start({}) > stop({:?})", start, stop); return Ok(()); } let mut log = self.log.write().await; @@ -276,32 +279,55 @@ impl RaftStorage for MemStore { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, data))] - async fn apply_entry_to_state_machine(&self, index: &LogId, data: &ClientRequest) -> Result { + #[tracing::instrument(level = "trace", skip(self, entry))] + async fn apply_entry_to_state_machine(&self, entry: &Entry) -> Result { let mut sm = self.sm.write().await; - sm.last_applied_log = *index; - if let Some((serial, res)) = sm.client_serial_responses.get(&data.client) { - if serial == &data.serial { - return Ok(ClientResponse(res.clone())); + sm.last_applied_log = entry.log_id; + + return match entry.payload { + EntryPayload::Blank => return Ok(ClientResponse(None)), + EntryPayload::SnapshotPointer(_) => return Ok(ClientResponse(None)), + EntryPayload::Normal(ref norm) => { + let data = &norm.data; + if let Some((serial, res)) = sm.client_serial_responses.get(&data.client) { + if serial == &data.serial { + return Ok(ClientResponse(res.clone())); + } + } + let previous = sm.client_status.insert(data.client.clone(), data.status.clone()); + sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone())); + Ok(ClientResponse(previous)) } - } - let previous = sm.client_status.insert(data.client.clone(), data.status.clone()); - sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone())); - Ok(ClientResponse(previous)) + EntryPayload::ConfigChange(ref mem) => { + sm.last_membership = Some(mem.membership.clone()); + return Ok(ClientResponse(None)); + } + }; } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn replicate_to_state_machine(&self, entries: &[(&LogId, &ClientRequest)]) -> Result<()> { + async fn replicate_to_state_machine(&self, entries: &[&Entry]) -> Result<()> { let mut sm = self.sm.write().await; - for (index, data) in entries { - sm.last_applied_log = **index; - if let Some((serial, _)) = sm.client_serial_responses.get(&data.client) { - if serial == &data.serial { - continue; + for entry in entries { + sm.last_applied_log = entry.log_id; + + match entry.payload { + EntryPayload::Blank => {} + EntryPayload::SnapshotPointer(_) => {} + EntryPayload::Normal(ref norm) => { + let data = &norm.data; + if let Some((serial, _)) = sm.client_serial_responses.get(&data.client) { + if serial == &data.serial { + continue; + } + } + let previous = sm.client_status.insert(data.client.clone(), data.status.clone()); + sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone())); } - } - let previous = sm.client_status.insert(data.client.clone(), data.status.clone()); - sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone())); + EntryPayload::ConfigChange(ref mem) => { + sm.last_membership = Some(mem.membership.clone()); + } + }; } Ok(()) } @@ -309,17 +335,17 @@ impl RaftStorage for MemStore { #[tracing::instrument(level = "trace", skip(self))] async fn do_log_compaction(&self) -> Result> { let (data, last_applied_log); + let membership_config; { // Serialize the data of the state machine. let sm = self.sm.read().await; data = serde_json::to_vec(&*sm)?; last_applied_log = sm.last_applied_log; + membership_config = sm.last_membership.clone().unwrap_or_else(|| MembershipConfig::new_initial(self.id)); } // Release state machine read lock. let snapshot_size = data.len(); - let membership_config = self.get_membership_from_log(Some(last_applied_log.index)).await?; - let snapshot_idx = { let mut l = self.snapshot_idx.lock().unwrap(); *l += 1; diff --git a/memstore/src/test.rs b/memstore/src/test.rs index c03dc2cda..e8981ae89 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use async_raft::raft::EntryConfigChange; +use async_raft::raft::EntryNormal; use super::*; @@ -256,13 +257,18 @@ async fn test_replicate_to_log() -> Result<()> { async fn test_apply_entry_to_state_machine() -> Result<()> { let store = default_store_with_logs(); - store - .apply_entry_to_state_machine(&LogId { term: 3, index: 1 }, &ClientRequest { - client: "0".into(), - serial: 0, - status: "lit".into(), - }) - .await?; + let entry = Entry { + log_id: LogId { term: 3, index: 1 }, + + payload: EntryPayload::Normal(EntryNormal { + data: ClientRequest { + client: "0".into(), + serial: 0, + status: "lit".into(), + }, + }), + }; + store.apply_entry_to_state_machine(&entry).await?; let sm = store.get_state_machine().await; assert_eq!( @@ -305,12 +311,20 @@ async fn test_replicate_to_state_machine() -> Result<()> { serial: 0, status: "other".into(), }; + let entries = vec![ (&LogId { term: 3, index: 1 }, &req0), (&LogId { term: 3, index: 2 }, &req1), (&LogId { term: 3, index: 3 }, &req2), - ]; - store.replicate_to_state_machine(&entries).await?; + ] + .into_iter() + .map(|(id, req)| Entry { + log_id: *id, + payload: EntryPayload::Normal(EntryNormal { data: req.clone() }), + }) + .collect::>(); + + store.replicate_to_state_machine(&entries.iter().collect::>()).await?; let sm = store.get_state_machine().await; assert_eq!(