From f99ade30a7f806f18ed19ace12e226cd62fd43ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Mon, 4 Jul 2022 17:32:43 +0800 Subject: [PATCH] Change: API: move default impl methods in RaftStorage to StorageHelper --- openraft/src/core/append_entries.rs | 11 +- openraft/src/core/client.rs | 3 +- openraft/src/core/install_snapshot.rs | 3 +- openraft/src/core/mod.rs | 5 +- openraft/src/lib.rs | 2 + openraft/src/replication/mod.rs | 3 +- openraft/src/storage_helper.rs | 153 ++++++++++++++++ openraft/src/testing/suite.rs | 164 +++++++++--------- openraft/src/types/v070/storage.rs | 114 ------------ .../append_entries/t20_append_conflicts.rs | 3 +- .../t30_append_inconsistent_log.rs | 3 +- openraft/tests/initialization.rs | 3 +- openraft/tests/log_compaction/compaction.rs | 3 +- openraft/tests/membership/t10_add_learner.rs | 4 +- .../tests/membership/t20_change_membership.rs | 4 +- .../snapshot/snapshot_overrides_membership.rs | 6 +- .../snapshot_uses_prev_snap_membership.rs | 10 +- .../state_machine/t40_clean_applied_logs.rs | 4 +- 18 files changed, 276 insertions(+), 222 deletions(-) create mode 100644 openraft/src/storage_helper.rs diff --git a/openraft/src/core/append_entries.rs b/openraft/src/core/append_entries.rs index 9824d390d..9fafa28db 100644 --- a/openraft/src/core/append_entries.rs +++ b/openraft/src/core/append_entries.rs @@ -15,6 +15,7 @@ use crate::MessageSummary; use crate::RaftNetwork; use crate::RaftStorage; use crate::StorageError; +use crate::StorageHelper; use crate::Update; impl, S: RaftStorage> RaftCore { @@ -113,7 +114,7 @@ impl, S: RaftStorage> Ra // TODO(xp): get_membership() should have a defensive check to ensure it always returns Some() if node is // initialized. Because a node always commit a membership log as the first log entry. - let membership = self.storage.get_membership().await?; + let membership = StorageHelper::new(&self.storage).get_membership().await?; // TODO(xp): This is a dirty patch: // When a node starts in a single-node mode, it does not append an initial log @@ -284,7 +285,7 @@ impl, S: RaftStorage> Ra let index = log_id.index; // TODO(xp): this is a naive impl. Batch loading entries from storage. - let log = self.storage.try_get_log_entry(index).await?; + let log = StorageHelper::new(&self.storage).try_get_log_entry(index).await?; if let Some(local) = log { if local.log_id == log_id { @@ -317,7 +318,7 @@ impl, S: RaftStorage> Ra let index = log_id.index; - let log = self.storage.try_get_log_entry(index).await?; + let log = StorageHelper::new(&self.storage).try_get_log_entry(index).await?; tracing::debug!( "check log id matching: local: {:?} remote: {}", log.as_ref().map(|x| x.log_id), @@ -393,7 +394,9 @@ impl, S: RaftStorage> Ra // Drain entries from the beginning of the cache up to commit index. - let entries = self.storage.get_log_entries(self.last_applied.next_index()..self.committed.next_index()).await?; + let entries = StorageHelper::new(&self.storage) + .get_log_entries(self.last_applied.next_index()..self.committed.next_index()) + .await?; let last_log_id = entries.last().map(|x| x.log_id).unwrap(); diff --git a/openraft/src/core/client.rs b/openraft/src/core/client.rs index 63f173e7a..d010558a8 100644 --- a/openraft/src/core/client.rs +++ b/openraft/src/core/client.rs @@ -28,6 +28,7 @@ use crate::MessageSummary; use crate::RaftNetwork; use crate::RaftStorage; use crate::StorageError; +use crate::StorageHelper; /// A wrapper around a ClientRequest which has been transformed into an Entry, along with its response channel. pub(super) struct ClientRequestEntry { @@ -304,7 +305,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage }; if index != expected_next_index { - let entries = self.core.storage.get_log_entries(expected_next_index..index).await?; + let entries = StorageHelper::new(&self.core.storage).get_log_entries(expected_next_index..index).await?; if let Some(entry) = entries.last() { self.core.last_applied = Some(entry.log_id); diff --git a/openraft/src/core/install_snapshot.rs b/openraft/src/core/install_snapshot.rs index 7d7c135b8..26b1a1c84 100644 --- a/openraft/src/core/install_snapshot.rs +++ b/openraft/src/core/install_snapshot.rs @@ -21,6 +21,7 @@ use crate::RaftNetwork; use crate::RaftStorage; use crate::SnapshotSegmentId; use crate::StorageError; +use crate::StorageHelper; use crate::StorageIOError; use crate::Update; @@ -249,7 +250,7 @@ impl, S: RaftStorage> Ra } // There could be unknown membership in the snapshot. - let membership = self.storage.get_membership().await?; + let membership = StorageHelper::new(&self.storage).get_membership().await?; tracing::debug!("storage membership: {:?}", membership); assert!(membership.is_some()); diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index a0e170f64..68fcf38f2 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -62,6 +62,7 @@ use crate::types::v070::RaftNetwork; use crate::types::v070::RaftStorage; use crate::types::v070::StorageError; use crate::MessageSummary; +use crate::StorageHelper; use crate::Update; impl EffectiveMembership { @@ -223,7 +224,7 @@ impl, S: RaftStorage> Ra async fn do_main(&mut self) -> Result<(), Fatal> { tracing::debug!("raft node is initializing"); - let state = self.storage.get_initial_state().await?; + let state = StorageHelper::new(&self.storage).get_initial_state().await?; // TODO(xp): this is not necessary. self.storage.save_hard_state(&state.hard_state).await?; @@ -600,7 +601,7 @@ where return Ok(()); } - let log_id = sto.get_log_id(end - 1).await?; + let log_id = StorageHelper::new(&sto).get_log_id(end - 1).await?; sto.purge_logs_upto(log_id).await } diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 46d5313a5..78fa6306d 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -22,6 +22,7 @@ pub mod types; #[cfg(test)] mod metrics_wait_test; +mod storage_helper; pub use async_trait; @@ -35,6 +36,7 @@ pub use crate::raft::Raft; pub use crate::raft_types::LogIdOptionExt; pub use crate::raft_types::Update; pub use crate::replication::ReplicationMetrics; +pub use crate::storage_helper::StorageHelper; pub use crate::store_ext::StoreExt; pub use crate::store_wrapper::Wrapper; pub use crate::summary::MessageSummary; diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 030e8f0dd..b91fe7e64 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -35,6 +35,7 @@ use crate::NodeId; use crate::RaftNetwork; use crate::RaftStorage; use crate::Snapshot; +use crate::StorageHelper; #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ReplicationMetrics { @@ -297,7 +298,7 @@ impl, S: RaftStorage> Re let prev_log_id = if prev_index == last_purged.index() { last_purged } else if let Some(prev_i) = prev_index { - let first = self.storage.try_get_log_entry(prev_i).await?; + let first = StorageHelper::new(&self.storage).try_get_log_entry(prev_i).await?; match first { Some(f) => Some(f.log_id), None => { diff --git a/openraft/src/storage_helper.rs b/openraft/src/storage_helper.rs new file mode 100644 index 000000000..bb1811582 --- /dev/null +++ b/openraft/src/storage_helper.rs @@ -0,0 +1,153 @@ +use std::fmt::Debug; +use std::marker::PhantomData; +use std::ops::RangeBounds; + +use crate::defensive::check_range_matches_entries; +use crate::AppData; +use crate::AppDataResponse; +use crate::EffectiveMembership; +use crate::Entry; +use crate::EntryPayload; +use crate::InitialState; +use crate::LogId; +use crate::LogIdOptionExt; +use crate::RaftStorage; +use crate::StorageError; + +/// StorageHelper provides additional methods to access a RaftStorage implementation. +pub struct StorageHelper<'a, D, R, S, Sto> +where + D: AppData, + R: AppDataResponse, + + S: RaftStorage, + Sto: AsRef, +{ + pub(crate) sto: &'a Sto, + _p: PhantomData<(D, R, S)>, +} + +impl<'a, D, R, S, Sto> StorageHelper<'a, D, R, S, Sto> +where + D: AppData, + R: AppDataResponse, + S: RaftStorage, + Sto: AsRef, +{ + pub fn new(sto: &'a Sto) -> Self { + Self { + sto, + _p: Default::default(), + } + } + + /// Get Raft's state information from storage. + /// + /// When the Raft node is first started, it will call this interface to fetch the last known state from stable + /// storage. + pub async fn get_initial_state(&self) -> Result { + let hs = self.sto.as_ref().read_hard_state().await?; + let st = self.sto.as_ref().get_log_state().await?; + let mut last_log_id = st.last_log_id; + let (last_applied, _) = self.sto.as_ref().last_applied_state().await?; + let membership = self.get_membership().await?; + + // Clean up dirty state: snapshot is installed but logs are not cleaned. + if last_log_id < last_applied { + self.sto.as_ref().purge_logs_upto(last_applied.unwrap()).await?; + last_log_id = last_applied; + } + + Ok(InitialState { + last_log_id, + last_applied, + hard_state: hs.unwrap_or_default(), + last_membership: membership, + }) + } + + /// Returns the last membership config found in log or state machine. + pub async fn get_membership(&self) -> Result, StorageError> { + let (_, sm_mem) = self.sto.as_ref().last_applied_state().await?; + + let sm_mem_index = match &sm_mem { + None => 0, + Some(mem) => mem.log_id.index, + }; + + let log_mem = self.last_membership_in_log(sm_mem_index + 1).await?; + + if log_mem.is_some() { + return Ok(log_mem); + } + + Ok(sm_mem) + } + + /// Get the latest membership config found in the log. + /// + /// This method should returns membership with the greatest log index which is `>=since_index`. + /// If no such membership log is found, it returns `None`, e.g., when logs are cleaned after being applied. + #[tracing::instrument(level = "trace", skip(self))] + pub async fn last_membership_in_log(&self, since_index: u64) -> Result, StorageError> { + let st = self.sto.as_ref().get_log_state().await?; + + let mut end = st.last_log_id.next_index(); + let start = std::cmp::max(st.last_purged_log_id.next_index(), since_index); + let step = 64; + + while start < end { + let step_start = std::cmp::max(start, end.saturating_sub(step)); + let entries = self.sto.as_ref().try_get_log_entries(step_start..end).await?; + + for ent in entries.iter().rev() { + if let EntryPayload::Membership(ref mem) = ent.payload { + return Ok(Some(EffectiveMembership { + log_id: ent.log_id, + membership: mem.clone(), + })); + } + } + + end = end.saturating_sub(step); + } + + Ok(None) + } + + /// Get a series of log entries from storage. + /// + /// Similar to `try_get_log_entries` except an error will be returned if there is an entry not found in the + /// specified range. + pub async fn get_log_entries + Clone + Debug + Send + Sync>( + &self, + range: RB, + ) -> Result>, StorageError> { + let res = self.sto.as_ref().try_get_log_entries(range.clone()).await?; + + check_range_matches_entries(range, &res)?; + + Ok(res) + } + + /// Try to get an log entry. + /// + /// It does not return an error if the log entry at `log_index` is not found. + pub async fn try_get_log_entry(&self, log_index: u64) -> Result>, StorageError> { + let mut res = self.sto.as_ref().try_get_log_entries(log_index..(log_index + 1)).await?; + Ok(res.pop()) + } + + /// Get the log id of the entry at `index`. + pub async fn get_log_id(&self, log_index: u64) -> Result { + let st = self.sto.as_ref().get_log_state().await?; + + if Some(log_index) == st.last_purged_log_id.index() { + return Ok(st.last_purged_log_id.unwrap()); + } + + let entries = self.get_log_entries(log_index..=log_index).await?; + + Ok(entries[0].log_id) + } +} diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 62442ce63..4034b1492 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; +use std::sync::Arc; use maplit::btreeset; @@ -19,6 +20,7 @@ use crate::InitialState; use crate::LogId; use crate::Membership; use crate::RaftStorage; +use crate::StorageHelper; use crate::Violation; const NODE_ID: u64 = 0; @@ -92,9 +94,9 @@ where } pub async fn last_membership_in_log_initial(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); - let membership = store.last_membership_in_log(0).await?; + let membership = StorageHelper::new(&store).last_membership_in_log(0).await?; assert!(membership.is_none()); @@ -102,7 +104,7 @@ where } pub async fn last_membership_in_log(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); tracing::info!("--- no log, do not read membership from state machine"); { @@ -119,7 +121,7 @@ where ]) .await?; - let mem = store.last_membership_in_log(0).await?; + let mem = StorageHelper::new(&store).last_membership_in_log(0).await?; assert!(mem.is_none()); } @@ -133,15 +135,15 @@ where }]) .await?; - let mem = store.last_membership_in_log(0).await?; + let mem = StorageHelper::new(&store).last_membership_in_log(0).await?; let mem = mem.unwrap(); assert_eq!(Membership::new_single(btreeset! {1, 2, 3}), mem.membership,); - let mem = store.last_membership_in_log(1).await?; + let mem = StorageHelper::new(&store).last_membership_in_log(1).await?; let mem = mem.unwrap(); assert_eq!(Membership::new_single(btreeset! {1, 2, 3}), mem.membership,); - let mem = store.last_membership_in_log(2).await?; + let mem = StorageHelper::new(&store).last_membership_in_log(2).await?; assert!(mem.is_none()); } @@ -160,7 +162,7 @@ where ]) .await?; - let mem = store.last_membership_in_log(0).await?; + let mem = StorageHelper::new(&store).last_membership_in_log(0).await?; let mem = mem.unwrap(); assert_eq!(Membership::new_single(btreeset! {7,8,9},), mem.membership,); @@ -168,7 +170,7 @@ where tracing::info!("--- membership presents in log and > sm.last_applied, read from log but since_index is greater than the last"); { - let mem = store.last_membership_in_log(4).await?; + let mem = StorageHelper::new(&store).last_membership_in_log(4).await?; assert!(mem.is_none()); } @@ -176,7 +178,7 @@ where } pub async fn last_membership_in_log_multi_step(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); tracing::info!("--- find membership log entry backwards, multiple steps"); { @@ -205,7 +207,7 @@ where }]) .await?; - let mem = store.last_membership_in_log(0).await?; + let mem = StorageHelper::new(&store).last_membership_in_log(0).await?; assert!(mem.is_some()); assert_eq!(Membership::new_single(btreeset! {5,6,7}), mem.unwrap().membership,); } @@ -214,9 +216,9 @@ where } pub async fn get_membership_initial(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); - let membership = store.get_membership().await?; + let membership = StorageHelper::new(&store).get_membership().await?; assert!(membership.is_none()); @@ -224,7 +226,7 @@ where } pub async fn get_membership_from_log_and_sm(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); tracing::info!("--- no log, read membership from state machine"); { @@ -241,7 +243,7 @@ where ]) .await?; - let mem = store.get_membership().await?; + let mem = StorageHelper::new(&store).get_membership().await?; let mem = mem.unwrap(); assert_eq!(Membership::new_single(btreeset! {3,4,5}), mem.membership,); @@ -256,7 +258,7 @@ where }]) .await?; - let mem = store.get_membership().await?; + let mem = StorageHelper::new(&store).get_membership().await?; let mem = mem.unwrap(); @@ -272,7 +274,7 @@ where }]) .await?; - let mem = store.get_membership().await?; + let mem = StorageHelper::new(&store).get_membership().await?; let mem = mem.unwrap(); @@ -283,15 +285,15 @@ where } pub async fn get_initial_state_without_init(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); - let initial = store.get_initial_state().await?; + let initial = StorageHelper::new(&store).get_initial_state().await?; assert_eq!(InitialState::default(), initial, "uninitialized state"); Ok(()) } pub async fn get_initial_state_with_state(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::default_hard_state(&store).await?; store @@ -308,7 +310,7 @@ where }]) .await?; - let initial = store.get_initial_state().await?; + let initial = StorageHelper::new(&store).get_initial_state().await?; assert_eq!( initial.last_log_id, @@ -334,7 +336,7 @@ where pub async fn get_initial_state_membership_from_log_and_sm(builder: &B) -> anyhow::Result<()> { // It should never return membership from logs that are included in state machine present. - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::default_hard_state(&store).await?; // copy the test from get_membership_config @@ -354,7 +356,7 @@ where ]) .await?; - let initial = store.get_initial_state().await?; + let initial = StorageHelper::new(&store).get_initial_state().await?; assert_eq!( Membership::new_single(btreeset! {3,4,5}), @@ -371,7 +373,7 @@ where }]) .await?; - let initial = store.get_initial_state().await?; + let initial = StorageHelper::new(&store).get_initial_state().await?; assert_eq!( Membership::new_single(btreeset! {3,4,5}), @@ -388,7 +390,7 @@ where }]) .await?; - let initial = store.get_initial_state().await?; + let initial = StorageHelper::new(&store).get_initial_state().await?; assert_eq!( Membership::new_single(btreeset! {1,2,3}), @@ -400,7 +402,7 @@ where } pub async fn get_initial_state_last_log_gt_sm(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::default_hard_state(&store).await?; store @@ -423,7 +425,7 @@ where ]) .await?; - let initial = store.get_initial_state().await?; + let initial = StorageHelper::new(&store).get_initial_state().await?; assert_eq!( initial.last_log_id, @@ -434,14 +436,14 @@ where } pub async fn get_initial_state_last_log_lt_sm(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::default_hard_state(&store).await?; store.append_to_log(&[&blank(1, 2)]).await?; store.apply_to_state_machine(&[&blank(3, 1)]).await?; - let initial = store.get_initial_state().await?; + let initial = StorageHelper::new(&store).get_initial_state().await?; assert_eq!( initial.last_log_id, @@ -452,7 +454,7 @@ where } pub async fn save_hard_state(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); store .save_hard_state(&HardState { @@ -474,18 +476,18 @@ where } pub async fn get_log_entries(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; tracing::info!("--- get start == stop"); { - let logs = store.get_log_entries(3..3).await?; + let logs = StorageHelper::new(&store).get_log_entries(3..3).await?; assert_eq!(logs.len(), 0, "expected no logs to be returned"); } tracing::info!("--- get start < stop"); { - let logs = store.get_log_entries(5..7).await?; + let logs = StorageHelper::new(&store).get_log_entries(5..7).await?; assert_eq!(logs.len(), 2); assert_eq!(logs[0].log_id, (1, 5).into()); @@ -496,34 +498,34 @@ where } pub async fn try_get_log_entry(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.purge_logs_upto(LogId::new(0, 0)).await?; - let ent = store.try_get_log_entry(3).await?; + let ent = StorageHelper::new(&store).try_get_log_entry(3).await?; assert_eq!(Some(LogId { term: 1, index: 3 }), ent.map(|x| x.log_id)); - let ent = store.try_get_log_entry(0).await?; + let ent = StorageHelper::new(&store).try_get_log_entry(0).await?; assert_eq!(None, ent.map(|x| x.log_id)); - let ent = store.try_get_log_entry(11).await?; + let ent = StorageHelper::new(&store).try_get_log_entry(11).await?; assert_eq!(None, ent.map(|x| x.log_id)); Ok(()) } pub async fn initial_logs(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); - let ent = store.try_get_log_entry(0).await?; + let ent = StorageHelper::new(&store).try_get_log_entry(0).await?; assert!(ent.is_none(), "store initialized"); Ok(()) } pub async fn get_log_state(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); let st = store.get_log_state().await?; @@ -570,28 +572,28 @@ where } pub async fn get_log_id(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.purge_logs_upto(LogId::new(1, 3)).await?; - let res = store.get_log_id(0).await; + let res = StorageHelper::new(&store).get_log_id(0).await; assert!(res.is_err()); - let res = store.get_log_id(11).await; + let res = StorageHelper::new(&store).get_log_id(11).await; assert!(res.is_err()); - let res = store.get_log_id(3).await?; + let res = StorageHelper::new(&store).get_log_id(3).await?; assert_eq!(LogId::new(1, 3), res); - let res = store.get_log_id(4).await?; + let res = StorageHelper::new(&store).get_log_id(4).await?; assert_eq!(LogId::new(1, 4), res); Ok(()) } pub async fn last_id_in_log(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); let log_id = store.get_log_state().await?.last_log_id; assert_eq!(None, log_id); @@ -628,7 +630,7 @@ where } pub async fn last_applied_state(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); let (applied, membership) = store.last_applied_state().await?; assert_eq!(None, applied); @@ -680,7 +682,7 @@ where pub async fn delete_logs(builder: &B) -> anyhow::Result<()> { tracing::info!("--- delete (-oo, 0]"); { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.purge_logs_upto(LogId::new(0, 0)).await?; @@ -700,7 +702,7 @@ where tracing::info!("--- delete (-oo, 5]"); { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.purge_logs_upto(LogId::new(1, 5)).await?; @@ -720,7 +722,7 @@ where tracing::info!("--- delete (-oo, 20]"); { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.purge_logs_upto(LogId::new(1, 20)).await?; @@ -739,7 +741,7 @@ where tracing::info!("--- delete [11, +oo)"); { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.delete_conflict_logs_since(LogId::new(1, 11)).await?; @@ -758,7 +760,7 @@ where tracing::info!("--- delete [0, +oo)"); { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.delete_conflict_logs_since(LogId::new(0, 0)).await?; @@ -779,7 +781,7 @@ where } pub async fn append_to_log(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.purge_logs_upto(LogId::new(0, 0)).await?; @@ -795,7 +797,7 @@ where } // pub async fn apply_single(builder: &B) -> anyhow::Result<()> { - // let store = builder.build().await; + // let store = Arc::new(builder.build().await); // // let entry = Entry { // log_id: LogId { term: 3, index: 1 }, @@ -832,7 +834,7 @@ where // } // // pub async fn apply_multi(builder: &B) -> anyhow::Result<()> { - // let store = builder.build().await; + // let store = Arc::new(builder.build().await); // // let req0 = ClientRequest { // client: "1".into(), @@ -966,7 +968,7 @@ where } pub async fn df_get_membership_config_dirty_log(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); tracing::info!("--- dirty log: log.index > last_applied.index && log < last_applied"); { @@ -1007,7 +1009,7 @@ where ]) .await?; - let res = store.get_membership().await; + let res = StorageHelper::new(&store).get_membership().await; let e = res.unwrap_err().into_defensive().unwrap(); assert!(matches!(e, DefensiveError { @@ -1024,7 +1026,7 @@ where } pub async fn df_get_initial_state_dirty_log(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); tracing::info!("--- dirty log: log.index > last_applied.index && log < last_applied"); { @@ -1042,7 +1044,7 @@ where }]) .await?; - let state = store.get_initial_state().await; + let state = StorageHelper::new(&store).get_initial_state().await; let e = state.unwrap_err().into_defensive().unwrap(); assert!(matches!(e, DefensiveError { @@ -1059,7 +1061,7 @@ where } pub async fn df_save_hard_state_ascending(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); store .save_hard_state(&HardState { @@ -1171,21 +1173,21 @@ where } pub async fn df_get_log_entries(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); Self::feed_10_logs_vote_self(&store).await?; store.apply_to_state_machine(&[&blank(0, 0)]).await?; store.purge_logs_upto(LogId::new(0, 0)).await?; - store.get_log_entries(..).await?; - store.get_log_entries(5..).await?; - store.get_log_entries(..5).await?; - store.get_log_entries(5..7).await?; + StorageHelper::new(&store).get_log_entries(..).await?; + StorageHelper::new(&store).get_log_entries(5..).await?; + StorageHelper::new(&store).get_log_entries(..5).await?; + StorageHelper::new(&store).get_log_entries(5..7).await?; // mismatched bound. - let res = store.get_log_entries(11..).await; + let res = StorageHelper::new(&store).get_log_entries(11..).await; let e = res.unwrap_err().into_defensive().unwrap(); assert!(matches!(e, DefensiveError { subject: ErrorSubject::LogIndex(11), @@ -1193,7 +1195,7 @@ where .. })); - let res = store.get_log_entries(1..1).await; + let res = StorageHelper::new(&store).get_log_entries(1..1).await; let e = res.unwrap_err().into_defensive().unwrap(); assert!(matches!(e, DefensiveError { subject: ErrorSubject::Logs, @@ -1204,7 +1206,7 @@ where .. })); - let res = store.get_log_entries(0..1).await; + let res = StorageHelper::new(&store).get_log_entries(0..1).await; let e = res.unwrap_err().into_defensive().unwrap(); assert!(matches!(e, DefensiveError { subject: ErrorSubject::LogIndex(0), @@ -1212,7 +1214,7 @@ where .. })); - let res = store.get_log_entries(0..2).await; + let res = StorageHelper::new(&store).get_log_entries(0..2).await; let e = res.unwrap_err().into_defensive().unwrap(); assert!(matches!(e, DefensiveError { subject: ErrorSubject::LogIndex(0), @@ -1220,7 +1222,7 @@ where .. })); - let res = store.get_log_entries(10..12).await; + let res = StorageHelper::new(&store).get_log_entries(10..12).await; let e = res.unwrap_err().into_defensive().unwrap(); assert!(matches!(e, DefensiveError { subject: ErrorSubject::LogIndex(11), @@ -1235,7 +1237,7 @@ where } pub async fn df_append_to_log_nonempty_input(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); let res = store.append_to_log(Vec::<&Entry<_>>::new().as_slice()).await; @@ -1247,7 +1249,7 @@ where } pub async fn df_append_to_log_nonconsecutive_input(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); let res = store .append_to_log(&[ @@ -1276,7 +1278,7 @@ where } pub async fn df_append_to_log_eq_last_plus_one(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); tracing::info!("-- log_id <= last_applied"); tracing::info!("-- nonconsecutive log"); @@ -1305,7 +1307,7 @@ where // last_log: 1,1 // last_applied: 1,2 // append_to_log: 1,4 - let store = builder.build().await; + let store = Arc::new(builder.build().await); tracing::info!("-- log_id <= last_applied"); tracing::info!("-- nonconsecutive log"); @@ -1333,7 +1335,7 @@ where pub async fn df_append_to_log_gt_last_log_id(builder: &B) -> anyhow::Result<()> { // last_log: 2,2 // append_to_log: 1,3: index == last + 1 but term is lower - let store = builder.build().await; + let store = Arc::new(builder.build().await); store.append_to_log(&[&blank(0, 0), &blank(2, 1), &blank(2, 2)]).await?; @@ -1356,7 +1358,7 @@ where // last_log: 2,1 // last_applied: 2,2 // append_to_log: 1,3: index == last + 1 but term is lower - let store = builder.build().await; + let store = Arc::new(builder.build().await); store.append_to_log(&[&blank(0, 0), &blank(2, 1), &blank(2, 2)]).await?; @@ -1380,7 +1382,7 @@ where } pub async fn df_apply_nonempty_input(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); let res = store.apply_to_state_machine(Vec::<&Entry<_>>::new().as_slice()).await; @@ -1392,7 +1394,7 @@ where } pub async fn df_apply_index_eq_last_applied_plus_one(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); let entry = blank(3, 1); @@ -1433,7 +1435,7 @@ where } pub async fn df_apply_gt_last_applied_id(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); let entry = blank(3, 1); @@ -1460,7 +1462,7 @@ where } pub async fn df_purge_applied_le_last_applied(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); store.apply_to_state_machine(&[&blank(0, 0), &blank(3, 1)]).await?; @@ -1483,7 +1485,7 @@ where } pub async fn df_delete_conflict_gt_last_applied(builder: &B) -> anyhow::Result<()> { - let store = builder.build().await; + let store = Arc::new(builder.build().await); store.apply_to_state_machine(&[&blank(0, 0), &blank(3, 1)]).await?; diff --git a/openraft/src/types/v070/storage.rs b/openraft/src/types/v070/storage.rs index 9506306fb..1f56eb862 100644 --- a/openraft/src/types/v070/storage.rs +++ b/openraft/src/types/v070/storage.rs @@ -13,16 +13,12 @@ use super::AppData; use super::AppDataResponse; use super::EffectiveMembership; use super::Entry; -use super::EntryPayload; use super::HardState; -use super::InitialState; use super::LogId; use super::Snapshot; use super::SnapshotMeta; use super::StateMachineChanges; use super::StorageError; -use crate::defensive::check_range_matches_entries; -use crate::LogIdOptionExt; /// A trait defining the interface for a Raft storage system. /// @@ -40,116 +36,6 @@ where /// for details on where and how this is used. type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; - /// Returns the last membership config found in log or state machine. - async fn get_membership(&self) -> Result, StorageError> { - let (_, sm_mem) = self.last_applied_state().await?; - - let sm_mem_index = match &sm_mem { - None => 0, - Some(mem) => mem.log_id.index, - }; - - let log_mem = self.last_membership_in_log(sm_mem_index + 1).await?; - - if log_mem.is_some() { - return Ok(log_mem); - } - - return Ok(sm_mem); - } - - /// Get the latest membership config found in the log. - /// - /// This method should returns membership with the greatest log index which is `>=since_index`. - /// If no such membership log is found, it returns `None`, e.g., when logs are cleaned after being applied. - #[tracing::instrument(level = "trace", skip(self))] - async fn last_membership_in_log(&self, since_index: u64) -> Result, StorageError> { - let st = self.get_log_state().await?; - - let mut end = st.last_log_id.next_index(); - let start = std::cmp::max(st.last_purged_log_id.next_index(), since_index); - let step = 64; - - while start < end { - let step_start = std::cmp::max(start, end.saturating_sub(step)); - let entries = self.try_get_log_entries(step_start..end).await?; - - for ent in entries.iter().rev() { - if let EntryPayload::Membership(ref mem) = ent.payload { - return Ok(Some(EffectiveMembership { - log_id: ent.log_id, - membership: mem.clone(), - })); - } - } - - end = end.saturating_sub(step); - } - - Ok(None) - } - - /// Get Raft's state information from storage. - /// - /// When the Raft node is first started, it will call this interface to fetch the last known state from stable - /// storage. - async fn get_initial_state(&self) -> Result { - let hs = self.read_hard_state().await?; - let st = self.get_log_state().await?; - let mut last_log_id = st.last_log_id; - let (last_applied, _) = self.last_applied_state().await?; - let membership = self.get_membership().await?; - - // Clean up dirty state: snapshot is installed but logs are not cleaned. - if last_log_id < last_applied { - self.purge_logs_upto(last_applied.unwrap()).await?; - last_log_id = last_applied; - } - - Ok(InitialState { - last_log_id, - last_applied, - hard_state: hs.unwrap_or_default(), - last_membership: membership, - }) - } - - /// Get a series of log entries from storage. - /// - /// Similar to `try_get_log_entries` except an error will be returned if there is an entry not found in the - /// specified range. - async fn get_log_entries + Clone + Debug + Send + Sync>( - &self, - range: RB, - ) -> Result>, StorageError> { - let res = self.try_get_log_entries(range.clone()).await?; - - check_range_matches_entries(range, &res)?; - - Ok(res) - } - - /// Try to get an log entry. - /// - /// It does not return an error if the log entry at `log_index` is not found. - async fn try_get_log_entry(&self, log_index: u64) -> Result>, StorageError> { - let mut res = self.try_get_log_entries(log_index..(log_index + 1)).await?; - Ok(res.pop()) - } - - /// Get the log id of the entry at `index`. - async fn get_log_id(&self, log_index: u64) -> Result { - let st = self.get_log_state().await?; - - if Some(log_index) == st.last_purged_log_id.index() { - return Ok(st.last_purged_log_id.unwrap()); - } - - let entries = self.get_log_entries(log_index..=log_index).await?; - - Ok(entries[0].log_id) - } - // --- Hard State async fn save_hard_state(&self, hs: &HardState) -> Result<(), StorageError>; diff --git a/openraft/tests/append_entries/t20_append_conflicts.rs b/openraft/tests/append_entries/t20_append_conflicts.rs index 6a2427d54..b5a3a336b 100644 --- a/openraft/tests/append_entries/t20_append_conflicts.rs +++ b/openraft/tests/append_entries/t20_append_conflicts.rs @@ -13,6 +13,7 @@ use openraft::LogId; use openraft::MessageSummary; use openraft::RaftStorage; use openraft::State; +use openraft::StorageHelper; use crate::fixtures::blank; use crate::fixtures::RaftRouter; @@ -230,7 +231,7 @@ where R: AppDataResponse, Sto: RaftStorage, { - let logs = sto.get_log_entries(..).await?; + let logs = StorageHelper::new(sto).get_log_entries(..).await?; let skip = 0; let want: Vec> = terms .iter() diff --git a/openraft/tests/append_entries/t30_append_inconsistent_log.rs b/openraft/tests/append_entries/t30_append_inconsistent_log.rs index bd2c8fb84..9d24262d6 100644 --- a/openraft/tests/append_entries/t30_append_inconsistent_log.rs +++ b/openraft/tests/append_entries/t30_append_inconsistent_log.rs @@ -10,6 +10,7 @@ use openraft::Config; use openraft::LogId; use openraft::RaftStorage; use openraft::State; +use openraft::StorageHelper; use crate::fixtures::RaftRouter; @@ -121,7 +122,7 @@ async fn append_inconsistent_log() -> Result<()> { .metrics(|x| x.last_log_index == Some(n_logs), "sync log to node 0") .await?; - let logs = sto0.get_log_entries(60..=60).await?; + let logs = StorageHelper::new(&sto0).get_log_entries(60..=60).await?; assert_eq!(3, logs.first().unwrap().log_id.term, "log is overridden by leader logs"); Ok(()) diff --git a/openraft/tests/initialization.rs b/openraft/tests/initialization.rs index cbb0f461b..ff4a4598a 100644 --- a/openraft/tests/initialization.rs +++ b/openraft/tests/initialization.rs @@ -11,6 +11,7 @@ use openraft::LogId; use openraft::Membership; use openraft::RaftStorage; use openraft::State; +use openraft::StorageHelper; #[macro_use] mod fixtures; @@ -54,7 +55,7 @@ async fn initialization() -> Result<()> { for i in 0..3 { let sto = router.get_storage_handle(&1).await?; - let first = sto.get_log_entries(0..2).await?.first().cloned(); + let first = StorageHelper::new(&sto).get_log_entries(0..2).await?.first().cloned(); tracing::info!("--- check membership is replicated: id: {}, first log: {:?}", i, first); let mem = match first.unwrap().payload { diff --git a/openraft/tests/log_compaction/compaction.rs b/openraft/tests/log_compaction/compaction.rs index e4f2b5702..97028f2bb 100644 --- a/openraft/tests/log_compaction/compaction.rs +++ b/openraft/tests/log_compaction/compaction.rs @@ -13,6 +13,7 @@ use openraft::RaftNetwork; use openraft::RaftStorage; use openraft::SnapshotPolicy; use openraft::State; +use openraft::StorageHelper; use crate::fixtures::blank; use crate::fixtures::RaftRouter; @@ -115,7 +116,7 @@ async fn compaction() -> Result<()> { tracing::info!("--- logs should be deleted after installing snapshot; left only the last one"); { let sto = router.get_storage_handle(&1).await?; - let logs = sto.get_log_entries(..).await?; + let logs = StorageHelper::new(&sto).get_log_entries(..).await?; assert_eq!(1, logs.len()); assert_eq!(LogId { term: 1, index: 50 }, logs[0].log_id) } diff --git a/openraft/tests/membership/t10_add_learner.rs b/openraft/tests/membership/t10_add_learner.rs index 068632a57..44bec69d5 100644 --- a/openraft/tests/membership/t10_add_learner.rs +++ b/openraft/tests/membership/t10_add_learner.rs @@ -6,7 +6,7 @@ use maplit::btreeset; use openraft::raft::AddLearnerResponse; use openraft::Config; use openraft::LogId; -use openraft::RaftStorage; +use openraft::StorageHelper; use crate::fixtures::RaftRouter; @@ -60,7 +60,7 @@ async fn add_learner_basic() -> Result<()> { tracing::info!("--- add_learner blocks until the replication catches up"); let sto1 = router.get_storage_handle(&1).await?; - let logs = sto1.get_log_entries(..).await?; + let logs = StorageHelper::new(&sto1).get_log_entries(..).await?; assert_eq!(log_index, logs[logs.len() - 1].log_id.index); // 0-th log diff --git a/openraft/tests/membership/t20_change_membership.rs b/openraft/tests/membership/t20_change_membership.rs index 81ddf6417..4362b5443 100644 --- a/openraft/tests/membership/t20_change_membership.rs +++ b/openraft/tests/membership/t20_change_membership.rs @@ -5,7 +5,7 @@ use std::time::Duration; use maplit::btreeset; use openraft::error::ChangeMembershipError; use openraft::Config; -use openraft::RaftStorage; +use openraft::StorageHelper; use crate::fixtures::RaftRouter; @@ -47,7 +47,7 @@ async fn change_with_new_learner_blocking() -> anyhow::Result<()> { for node_id in 0..2 { let sto = router.get_storage_handle(&node_id).await?; - let logs = sto.get_log_entries(..).await?; + let logs = StorageHelper::new(&sto).get_log_entries(..).await?; assert_eq!(n_logs, logs[logs.len() - 1].log_id.index, "node: {}", node_id); // 0-th log assert_eq!(n_logs + 1, logs.len() as u64, "node: {}", node_id); diff --git a/openraft/tests/snapshot/snapshot_overrides_membership.rs b/openraft/tests/snapshot/snapshot_overrides_membership.rs index 15be989f5..8a72f70ea 100644 --- a/openraft/tests/snapshot/snapshot_overrides_membership.rs +++ b/openraft/tests/snapshot/snapshot_overrides_membership.rs @@ -10,8 +10,8 @@ use openraft::Config; use openraft::LogId; use openraft::Membership; use openraft::RaftNetwork; -use openraft::RaftStorage; use openraft::SnapshotPolicy; +use openraft::StorageHelper; use crate::fixtures::blank; use crate::fixtures::RaftRouter; @@ -104,7 +104,7 @@ async fn snapshot_overrides_membership() -> Result<()> { tracing::info!("--- check that learner membership is affected"); { - let m = sto.get_membership().await?; + let m = StorageHelper::new(&sto).get_membership().await?; let m = m.unwrap(); @@ -146,7 +146,7 @@ async fn snapshot_overrides_membership() -> Result<()> { ) .await?; - let m = sto.get_membership().await?; + let m = StorageHelper::new(&sto).get_membership().await?; let m = m.unwrap(); diff --git a/openraft/tests/snapshot/snapshot_uses_prev_snap_membership.rs b/openraft/tests/snapshot/snapshot_uses_prev_snap_membership.rs index 9257d6fda..c0d7488bb 100644 --- a/openraft/tests/snapshot/snapshot_uses_prev_snap_membership.rs +++ b/openraft/tests/snapshot/snapshot_uses_prev_snap_membership.rs @@ -6,8 +6,8 @@ use maplit::btreeset; use openraft::Config; use openraft::LogId; use openraft::Membership; -use openraft::RaftStorage; use openraft::SnapshotPolicy; +use openraft::StorageHelper; use crate::fixtures::RaftRouter; @@ -72,10 +72,10 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { .await?; { - let logs = sto0.get_log_entries(..).await?; + let logs = StorageHelper::new(&sto0).get_log_entries(..).await?; assert_eq!(2, logs.len(), "only one applied log is kept"); } - let m = sto0.get_membership().await?; + let m = StorageHelper::new(&sto0).get_membership().await?; let m = m.unwrap(); @@ -121,10 +121,10 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { tracing::info!("--- check membership"); { { - let logs = sto0.get_log_entries(..).await?; + let logs = StorageHelper::new(&sto0).get_log_entries(..).await?; assert_eq!(2, logs.len(), "only one applied log"); } - let m = sto0.get_membership().await?; + let m = StorageHelper::new(&sto0).get_membership().await?; let m = m.unwrap(); diff --git a/openraft/tests/state_machine/t40_clean_applied_logs.rs b/openraft/tests/state_machine/t40_clean_applied_logs.rs index 5f23f6ef5..43aa45b8a 100644 --- a/openraft/tests/state_machine/t40_clean_applied_logs.rs +++ b/openraft/tests/state_machine/t40_clean_applied_logs.rs @@ -4,7 +4,7 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::Config; -use openraft::RaftStorage; +use openraft::StorageHelper; use tokio::time::sleep; use crate::fixtures::RaftRouter; @@ -46,7 +46,7 @@ async fn clean_applied_logs() -> Result<()> { { for node_id in 0..1 { let sto = router.get_storage_handle(&node_id).await?; - let logs = sto.get_log_entries(..).await?; + let logs = StorageHelper::new(&sto).get_log_entries(..).await?; assert_eq!(2, logs.len(), "node {} should have only {} logs", node_id, 2); } }