diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index 6f7ac2444..f197afe8b 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -46,7 +46,7 @@ where S: AsyncRead + AsyncSeek + Send + Unpin + 'static /// /// This model derives serde's traits for easily (de)serializing this /// model for storage & retrieval. -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)] pub struct HardState { /// The last recorded term observed by this system. pub current_term: u64, @@ -112,6 +112,13 @@ where /// For all other methods of this trait, returning an error will cause Raft to shutdown. type ShutdownError: Error + Send + Sync + 'static; + /// Set if to turn on defensive check to unexpected input. + /// E.g. discontinuous log appending. + /// The default impl returns `false` to indicate it does impl any defensive check. + async fn defensive(&self, _d: bool) -> bool { + false + } + /// Get the latest membership config found in the log. /// /// This must always be implemented as a reverse search through the log to find the most diff --git a/memstore/Cargo.toml b/memstore/Cargo.toml index 5117fd488..45443f3ce 100644 --- a/memstore/Cargo.toml +++ b/memstore/Cargo.toml @@ -15,6 +15,7 @@ readme = "README.md" [dependencies] anyhow = "1.0.32" async-raft = { version="0.6", path="../async-raft" } +async-trait = "0.1.36" serde = { version="1.0.114", features=["derive"] } serde_json = "1.0.57" thiserror = "1.0.20" diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index f94611878..cca9abfda 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -85,6 +85,9 @@ pub struct MemStoreStateMachine { /// An in-memory storage system implementing the `async_raft::RaftStorage` trait. pub struct MemStore { + /// Turn on defensive check for inputs. + defensive: RwLock, + /// The ID of the Raft node for which this memory storage instances is configured. id: NodeId, /// The Raft log. @@ -107,6 +110,7 @@ impl MemStore { let hs = RwLock::new(None); let current_snapshot = RwLock::new(None); Self { + defensive: RwLock::new(false), id, log, sm, @@ -130,6 +134,7 @@ impl MemStore { let hs = RwLock::new(hs); let current_snapshot = RwLock::new(current_snapshot); Self { + defensive: RwLock::new(false), id, log, sm, @@ -140,6 +145,181 @@ impl MemStore { } } +// TODO(xp): elaborate errors +impl MemStore { + /// Ensure that logs that have greater index than last_applied should have greater log_id. + /// Invariant must hold: `log.log_id.index > last_applied.index` implies `log.log_id > last_applied`. + pub async fn defensive_no_dirty_log(&self) -> anyhow::Result<()> { + if !*self.defensive.read().await { + return Ok(()); + } + + let log = self.log.read().await; + let sm = self.sm.read().await; + let last_log_id = log.iter().last().map(|(_index, entry)| entry.log_id).unwrap_or_default(); + let last_applied = sm.last_applied_log; + + if last_log_id.index > last_applied.index && last_log_id < last_applied { + return Err(anyhow::anyhow!("greater index log is smaller than last_applied")); + } + + Ok(()) + } + + /// Ensure that current_term must increment for every update, and for every term there could be only one value for + /// voted_for. + pub async fn defensive_incremental_hard_state(&self, hs: &HardState) -> anyhow::Result<()> { + if !*self.defensive.read().await { + return Ok(()); + } + + let h = self.hs.write().await; + let curr = h.clone().unwrap_or_default(); + if hs.current_term < curr.current_term { + return Err(anyhow::anyhow!("smaller term is now allowed")); + } + + if hs.current_term == curr.current_term && hs.voted_for != curr.voted_for { + return Err(anyhow::anyhow!("voted_for can not change in one term")); + } + + Ok(()) + } + + pub async fn defensive_consecutive_input(&self, entries: &[&Entry]) -> anyhow::Result<()> { + if !*self.defensive.read().await { + return Ok(()); + } + + if entries.is_empty() { + return Ok(()); + } + + let mut prev_log_id = entries[0].log_id; + + for e in entries.iter().skip(1) { + if e.log_id.index != prev_log_id.index + 1 { + return Err(anyhow::anyhow!( + "nonconsecutive input log index: {}, {}", + prev_log_id, + e.log_id + )); + } + + prev_log_id = e.log_id; + } + + Ok(()) + } + + pub async fn defensive_nonempty_input(&self, entries: &[&Entry]) -> anyhow::Result<()> { + if !*self.defensive.read().await { + return Ok(()); + } + + if entries.is_empty() { + return Err(anyhow::anyhow!("append empty entries")); + } + + Ok(()) + } + + pub async fn defensive_append_log_index_is_last_plus_one( + &self, + entries: &[&Entry], + ) -> anyhow::Result<()> { + if !*self.defensive.read().await { + return Ok(()); + } + + let last_id = self.last_log_id().await; + + let first_id = entries[0].log_id; + if last_id.index + 1 != first_id.index { + return Err(anyhow::anyhow!( + "first input log index({}) is not last({}) + 1", + first_id.index, + last_id.index, + )); + } + + Ok(()) + } + + pub async fn defensive_append_log_id_gt_last(&self, entries: &[&Entry]) -> anyhow::Result<()> { + if !*self.defensive.read().await { + return Ok(()); + } + + let last_id = self.last_log_id().await; + + let first_id = entries[0].log_id; + if first_id < last_id { + return Err(anyhow::anyhow!( + "first input log id({}) is not > last id({})", + first_id, + last_id, + )); + } + + Ok(()) + } + + /// Find the last known log id from log or state machine + /// If no log id found, the default one `0,0` is returned. + pub async fn last_log_id(&self) -> LogId { + let log_last_id = { + let log_last = self.log.read().await; + log_last.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default() + }; + + let sm_last_id = self.sm.read().await.last_applied_log; + + std::cmp::max(log_last_id, sm_last_id) + } + + pub async fn defensive_apply_index_is_last_applied_plus_one( + &self, + entries: &[&Entry], + ) -> anyhow::Result<()> { + if !*self.defensive.read().await { + return Ok(()); + } + + let last_id = self.sm.read().await.last_applied_log; + + let first_id = entries[0].log_id; + if last_id.index + 1 != first_id.index { + return Err(anyhow::anyhow!( + "first input log index({}) is not last({}) + 1", + first_id.index, + last_id.index, + )); + } + + Ok(()) + } + + pub async fn defensive_apply_log_id_gt_last(&self, entries: &[&Entry]) -> anyhow::Result<()> { + if !*self.defensive.read().await { + return Ok(()); + } + + let last_id = self.sm.read().await.last_applied_log; + + let first_id = entries[0].log_id; + if first_id < last_id { + return Err(anyhow::anyhow!( + "first input log id({}) is not > last id({})", + first_id, + last_id, + )); + } + + Ok(()) + } +} + #[async_trait] impl RaftStorageDebug for MemStore { /// Get a handle to the state machine for testing purposes. @@ -168,6 +348,8 @@ impl MemStore { /// Go backwards through the log to find the most recent membership config <= `upto_index`. #[tracing::instrument(level = "trace", skip(self))] pub async fn get_membership_from_log(&self, upto_index: Option) -> Result { + self.defensive_no_dirty_log().await?; + let membership = { let log = self.log.read().await; @@ -213,6 +395,12 @@ impl RaftStorage for MemStore { type SnapshotData = Cursor>; type ShutdownError = ShutdownError; + async fn defensive(&self, d: bool) -> bool { + let mut defensive_flag = self.defensive.write().await; + *defensive_flag = d; + d + } + #[tracing::instrument(level = "trace", skip(self))] async fn get_membership_config(&self) -> Result { self.get_membership_from_log(None).await @@ -220,6 +408,8 @@ impl RaftStorage for MemStore { #[tracing::instrument(level = "trace", skip(self))] async fn get_initial_state(&self) -> Result { + self.defensive_no_dirty_log().await?; + let membership = self.get_membership_config().await?; let mut hs = self.hs.write().await; let log = self.log.read().await; @@ -257,7 +447,11 @@ impl RaftStorage for MemStore { #[tracing::instrument(level = "trace", skip(self))] async fn save_hard_state(&self, hs: &HardState) -> Result<()> { - *self.hs.write().await = Some(hs.clone()); + self.defensive_incremental_hard_state(hs).await?; + + let mut h = self.hs.write().await; + + *h = Some(hs.clone()); Ok(()) } @@ -295,6 +489,11 @@ impl RaftStorage for MemStore { #[tracing::instrument(level = "trace", skip(self, entries))] async fn append_to_log(&self, entries: &[&Entry]) -> Result<()> { + self.defensive_nonempty_input(entries).await?; + self.defensive_consecutive_input(entries).await?; + self.defensive_append_log_index_is_last_plus_one(entries).await?; + self.defensive_append_log_id_gt_last(entries).await?; + let mut log = self.log.write().await; for entry in entries { log.insert(entry.log_id.index, (*entry).clone()); @@ -304,15 +503,16 @@ impl RaftStorage for MemStore { #[tracing::instrument(level = "trace", skip(self, entries))] async fn apply_to_state_machine(&self, entries: &[&Entry]) -> Result> { + self.defensive_nonempty_input(entries).await?; + self.defensive_apply_index_is_last_applied_plus_one(entries).await?; + self.defensive_apply_log_id_gt_last(entries).await?; + let mut sm = self.sm.write().await; let mut res = Vec::with_capacity(entries.len()); for entry in entries { tracing::debug!("id:{} replicate to sm index:{}", self.id, entry.log_id.index); - // TODO(xp) return error if there is out of order apply - assert_eq!(sm.last_applied_log.index + 1, entry.log_id.index); - sm.last_applied_log = entry.log_id; match entry.payload { diff --git a/memstore/src/test.rs b/memstore/src/test.rs index d12f43888..55de3f01b 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -3,25 +3,61 @@ use std::marker::PhantomData; use async_raft::raft::EntryConfigChange; use async_raft::raft::EntryNormal; +use async_trait::async_trait; use maplit::btreeset; use super::*; const NODE_ID: u64 = 0; -pub trait StoreBuilder: Clone { - fn new_store(&self, id: NodeId) -> S; +#[async_trait] +pub trait StoreBuilder: Send + Sync +where + D: AppData, + R: AppDataResponse, + S: RaftStorage, +{ + async fn new_store(&self, id: NodeId) -> S; } -#[derive(Clone)] struct MemStoreBuilder {} -impl StoreBuilder for MemStoreBuilder { - fn new_store(&self, id: NodeId) -> MemStore { +#[async_trait] +impl StoreBuilder for MemStoreBuilder { + async fn new_store(&self, id: NodeId) -> MemStore { MemStore::new(id) } } +struct DefensiveBuilder +where + D: AppData, + R: AppDataResponse, + S: RaftStorage, + B: StoreBuilder, +{ + inner: B, + d: PhantomData, + r: PhantomData, + s: PhantomData, +} + +#[async_trait] +impl StoreBuilder for DefensiveBuilder +where + D: AppData, + R: AppDataResponse, + S: RaftStorage, + B: StoreBuilder, +{ + async fn new_store(&self, id: NodeId) -> S { + let dsto = self.inner.new_store(id).await; + let d = dsto.defensive(true).await; + assert!(d, "inner must impl defensive check"); + dsto + } +} + #[test] pub fn test_mem_store() -> Result<()> { Suite::test_store(&MemStoreBuilder {})?; @@ -29,6 +65,18 @@ pub fn test_mem_store() -> Result<()> { Ok(()) } +#[test] +pub fn test_mem_store_defensive() -> Result<()> { + Suite::test_store_defensive(&DefensiveBuilder { + inner: MemStoreBuilder {}, + d: std::marker::PhantomData, + r: std::marker::PhantomData, + s: std::marker::PhantomData, + })?; + + Ok(()) +} + /// Block until a future is finished. /// The future will be running in a clean tokio runtime, to prevent an unfinished task affecting the test. pub fn run_fut(f: F) -> Result<()> @@ -42,7 +90,7 @@ where F: Future> { struct Suite where S: RaftStorageDebug + RaftStorage, - B: StoreBuilder, + B: StoreBuilder, { p: PhantomData, f: PhantomData, @@ -51,7 +99,7 @@ where impl Suite where S: RaftStorageDebug + RaftStorage, - B: StoreBuilder, + B: StoreBuilder, { fn test_store(builder: &B) -> Result<()> { run_fut(Suite::get_membership_config_default(builder))?; @@ -72,7 +120,7 @@ where } pub async fn get_membership_config_default(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; let membership = store.get_membership_config().await?; @@ -88,7 +136,7 @@ where } pub async fn get_membership_config_from_log_and_sm(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; tracing::info!("--- no log, read membership from state machine"); { @@ -175,7 +223,7 @@ where } pub async fn get_initial_state_default(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; let expected_hs = HardState { current_term: 0, @@ -211,7 +259,7 @@ where } pub async fn get_initial_state_with_state(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::default_hard_state(&store).await?; store @@ -254,7 +302,7 @@ where pub async fn get_initial_state_membership_from_log_and_sm(builder: &B) -> Result<()> { // It should never return membership from logs that are included in state machine present. - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::default_hard_state(&store).await?; // copy the test from get_membership_config @@ -344,7 +392,7 @@ where } pub async fn get_initial_state_last_log_gt_sm(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::default_hard_state(&store).await?; store @@ -379,7 +427,7 @@ where pub async fn get_initial_state_last_log_lt_sm(builder: &B) -> Result<()> { // TODO(xp): check membership: read from log first, then state machine then default. - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::default_hard_state(&store).await?; store @@ -407,7 +455,7 @@ where } pub async fn save_hard_state(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; store .save_hard_state(&HardState { @@ -429,7 +477,7 @@ where } pub async fn get_log_entries(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; tracing::info!("--- get start > stop"); @@ -459,7 +507,7 @@ where pub async fn delete_logs_from(builder: &B) -> Result<()> { tracing::info!("--- delete start > stop"); { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; store.delete_logs_from(10, Some(1)).await?; @@ -470,7 +518,7 @@ where tracing::info!("--- delete start == stop"); { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; store.delete_logs_from(1, Some(1)).await?; @@ -481,7 +529,7 @@ where tracing::info!("--- delete start < stop"); { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; store.delete_logs_from(1, Some(4)).await?; @@ -493,7 +541,7 @@ where tracing::info!("--- delete start < large stop"); { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; store.delete_logs_from(1, Some(1000)).await?; @@ -504,7 +552,7 @@ where tracing::info!("--- delete start, None"); { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; store.delete_logs_from(1, None).await?; @@ -517,7 +565,7 @@ where } pub async fn append_to_log(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; store @@ -536,7 +584,7 @@ where } pub async fn apply_single(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; let entry = Entry { log_id: LogId { term: 3, index: 1 }, @@ -574,7 +622,7 @@ where } pub async fn apply_multi(builder: &B) -> Result<()> { - let store = builder.new_store(NODE_ID); + let store = builder.new_store(NODE_ID).await; let req0 = ClientRequest { client: "1".into(), @@ -671,3 +719,486 @@ where Ok(()) } } + +// Defensive test: +// If a RaftStore impl support defensive check, enable it and check if it returns errors when abnormal input is seen. +// A RaftStore with defensive check is able to expose bugs in raft core. +impl Suite +where + S: RaftStorageDebug + RaftStorage, + B: StoreBuilder, +{ + fn test_store_defensive(builder: &B) -> Result<()> { + run_fut(Suite::df_get_membership_config_dirty_log(builder))?; + run_fut(Suite::df_get_initial_state_dirty_log(builder))?; + run_fut(Suite::df_save_hard_state_ascending(builder))?; + run_fut(Suite::df_delete_logs_from(builder))?; + run_fut(Suite::df_append_to_log_nonempty_input(builder))?; + run_fut(Suite::df_append_to_log_nonconsecutive_input(builder))?; + run_fut(Suite::df_append_to_log_eq_last_plus_one(builder))?; + run_fut(Suite::df_append_to_log_eq_last_applied_plus_one(builder))?; + run_fut(Suite::df_append_to_log_gt_last_log_id(builder))?; + run_fut(Suite::df_append_to_log_gt_last_applied_id(builder))?; + run_fut(Suite::df_apply_nonempty_input(builder))?; + run_fut(Suite::df_apply_index_eq_last_applied_plus_one(builder))?; + run_fut(Suite::df_apply_gt_last_applied_id(builder))?; + + Ok(()) + } + + pub async fn df_get_membership_config_dirty_log(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + tracing::info!("--- dirty log: log.index > last_applied.index && log < last_applied"); + { + store + .append_to_log(&[ + &Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 3 }, + payload: EntryPayload::ConfigChange(EntryConfigChange { + membership: MembershipConfig { + members: btreeset! {1,2,3}, + members_after_consensus: None, + }, + }), + }, + ]) + .await?; + store + .apply_to_state_machine(&[ + &Entry { + log_id: LogId { term: 2, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 2, index: 2 }, + payload: EntryPayload::ConfigChange(EntryConfigChange { + membership: MembershipConfig { + members: btreeset! {3,4,5}, + members_after_consensus: None, + }, + }), + }, + ]) + .await?; + + let mem = store.get_membership_config().await; + assert!(mem.is_err()); + } + + Ok(()) + } + + pub async fn df_get_initial_state_dirty_log(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + tracing::info!("--- dirty log: log.index > last_applied.index && log < last_applied"); + { + store + .append_to_log(&[ + &Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 3 }, + payload: EntryPayload::ConfigChange(EntryConfigChange { + membership: MembershipConfig { + members: btreeset! {1,2,3}, + members_after_consensus: None, + }, + }), + }, + ]) + .await?; + + store + .apply_to_state_machine(&[ + &Entry { + log_id: LogId { term: 2, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 2, index: 2 }, + payload: EntryPayload::ConfigChange(EntryConfigChange { + membership: MembershipConfig { + members: btreeset! {3,4,5}, + members_after_consensus: None, + }, + }), + }, + ]) + .await?; + + let state = store.get_initial_state().await; + assert!(state.is_err()); + } + + Ok(()) + } + + pub async fn df_save_hard_state_ascending(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + store + .save_hard_state(&HardState { + current_term: 10, + voted_for: Some(NODE_ID), + }) + .await?; + + tracing::info!("--- lower term is rejected"); + { + let res = store + .save_hard_state(&HardState { + current_term: 9, + voted_for: Some(NODE_ID), + }) + .await; + + assert!(res.is_err()); + + let state = store.get_initial_state().await?; + + assert_eq!( + HardState { + current_term: 10, + voted_for: Some(NODE_ID), + }, + state.hard_state, + ); + } + + tracing::info!("--- same term can not reset to None"); + { + let res = store + .save_hard_state(&HardState { + current_term: 10, + voted_for: None, + }) + .await; + + assert!(res.is_err()); + + let state = store.get_initial_state().await?; + + assert_eq!( + HardState { + current_term: 10, + voted_for: Some(NODE_ID), + }, + state.hard_state, + ); + } + + tracing::info!("--- same term can not change voted_for"); + { + let res = store + .save_hard_state(&HardState { + current_term: 10, + voted_for: Some(1000), + }) + .await; + + assert!(res.is_err()); + + let state = store.get_initial_state().await?; + + assert_eq!( + HardState { + current_term: 10, + voted_for: Some(NODE_ID), + }, + state.hard_state, + ); + } + + Ok(()) + } + + pub async fn df_delete_logs_from(_builder: &B) -> Result<()> { + // TODO(xp): what should we test about this? + Ok(()) + } + + pub async fn df_append_to_log_nonempty_input(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + let res = store.append_to_log(Vec::<&Entry<_>>::new().as_slice()).await; + assert!(res.is_err()); + + Ok(()) + } + + pub async fn df_append_to_log_nonconsecutive_input(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + let res = store + .append_to_log(&[ + &Entry { + log_id: (1, 1).into(), + payload: EntryPayload::Blank, + }, + &Entry { + log_id: (1, 3).into(), + payload: EntryPayload::Blank, + }, + ]) + .await; + assert!(res.is_err()); + + Ok(()) + } + + pub async fn df_append_to_log_eq_last_plus_one(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + tracing::info!("-- log_id <= last_applied"); + tracing::info!("-- nonconsecutive log"); + tracing::info!("-- overlapping log"); + + store + .append_to_log(&[ + &Entry { + log_id: (1, 1).into(), + payload: EntryPayload::Blank, + }, + &Entry { + log_id: (1, 2).into(), + payload: EntryPayload::Blank, + }, + ]) + .await?; + + store + .apply_to_state_machine(&[&Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }]) + .await?; + + let res = store + .append_to_log(&[&Entry { + log_id: (3, 4).into(), + payload: EntryPayload::Blank, + }]) + .await; + + assert!(res.is_err()); + + Ok(()) + } + + pub async fn df_append_to_log_eq_last_applied_plus_one(builder: &B) -> Result<()> { + // last_log: 1,1 + // last_applied: 1,2 + // append_to_log: 1,4 + let store = builder.new_store(NODE_ID).await; + + tracing::info!("-- log_id <= last_applied"); + tracing::info!("-- nonconsecutive log"); + tracing::info!("-- overlapping log"); + + store + .append_to_log(&[ + &Entry { + log_id: (1, 1).into(), + payload: EntryPayload::Blank, + }, + &Entry { + log_id: (1, 2).into(), + payload: EntryPayload::Blank, + }, + ]) + .await?; + + store + .apply_to_state_machine(&[ + &Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Blank, + }, + ]) + .await?; + + store.delete_logs_from(1, Some(2)).await?; + + let res = store + .append_to_log(&[&Entry { + log_id: (1, 4).into(), + payload: EntryPayload::Blank, + }]) + .await; + + assert!(res.is_err()); + + Ok(()) + } + + pub async fn df_append_to_log_gt_last_log_id(builder: &B) -> Result<()> { + // last_log: 2,2 + // append_to_log: 1,3: index == last + 1 but term is lower + let store = builder.new_store(NODE_ID).await; + + store + .append_to_log(&[ + &Entry { + log_id: (2, 1).into(), + payload: EntryPayload::Blank, + }, + &Entry { + log_id: (2, 2).into(), + payload: EntryPayload::Blank, + }, + ]) + .await?; + + let res = store + .append_to_log(&[&Entry { + log_id: (1, 3).into(), + payload: EntryPayload::Blank, + }]) + .await; + + assert!(res.is_err()); + + Ok(()) + } + + pub async fn df_append_to_log_gt_last_applied_id(builder: &B) -> Result<()> { + // last_log: 2,1 + // last_applied: 2,2 + // append_to_log: 1,3: index == last + 1 but term is lower + let store = builder.new_store(NODE_ID).await; + + store + .append_to_log(&[ + &Entry { + log_id: (2, 1).into(), + payload: EntryPayload::Blank, + }, + &Entry { + log_id: (2, 2).into(), + payload: EntryPayload::Blank, + }, + ]) + .await?; + + store + .apply_to_state_machine(&[ + &Entry { + log_id: LogId { term: 2, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 2, index: 2 }, + payload: EntryPayload::Blank, + }, + ]) + .await?; + + store.delete_logs_from(1, Some(2)).await?; + + let res = store + .append_to_log(&[&Entry { + log_id: (1, 3).into(), + payload: EntryPayload::Blank, + }]) + .await; + + assert!(res.is_err()); + + Ok(()) + } + + pub async fn df_apply_nonempty_input(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + let res = store.apply_to_state_machine(Vec::<&Entry<_>>::new().as_slice()).await; + assert!(res.is_err()); + + Ok(()) + } + + pub async fn df_apply_index_eq_last_applied_plus_one(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + let entry = Entry { + log_id: LogId { term: 3, index: 1 }, + + payload: EntryPayload::Normal(EntryNormal { + data: ClientRequest { + client: "0".into(), + serial: 0, + status: "lit".into(), + }, + }), + }; + + store.apply_to_state_machine(&[&entry]).await?; + + tracing::info!("--- re-apply 1th"); + { + let res = store.apply_to_state_machine(&[&entry]).await; + assert!(res.is_err()); + } + + tracing::info!("--- apply 3rd when there is only 1st"); + { + let entry = Entry { + log_id: LogId { term: 3, index: 3 }, + + payload: EntryPayload::Normal(EntryNormal { + data: ClientRequest { + client: "0".into(), + serial: 0, + status: "lit".into(), + }, + }), + }; + let res = store.apply_to_state_machine(&[&entry]).await; + assert!(res.is_err()); + } + + Ok(()) + } + + pub async fn df_apply_gt_last_applied_id(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + let entry = Entry { + log_id: LogId { term: 3, index: 1 }, + payload: EntryPayload::Blank, + }; + + store.apply_to_state_machine(&[&entry]).await?; + + tracing::info!("--- next apply with last_index+1 but lower term"); + { + let entry = Entry { + log_id: LogId { term: 2, index: 2 }, + payload: EntryPayload::Blank, + }; + let res = store.apply_to_state_machine(&[&entry]).await; + assert!(res.is_err()); + } + + Ok(()) + } +}