diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index 891776346..62fe1aa8f 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -194,6 +194,8 @@ impl, S: RaftStorage> Ra // --------------------------------------------------------------------> time // ``` + // TODO(xp): do not install if self.last_applied >= snapshot.meta.last_applied + let changes = self .storage .finalize_snapshot_installation(&req.meta, snapshot) @@ -207,6 +209,9 @@ impl, S: RaftStorage> Ra // If you have any question about this, let me know: drdr.xp at gmail.com if let Some(last_applied) = changes.last_applied { + // Applied logs are not needed. + self.storage.delete_logs_from(..=last_applied.index).await.map_err(|e| self.map_storage_error(e))?; + // snapshot is installed self.last_applied = last_applied; diff --git a/async-raft/src/raft_types.rs b/async-raft/src/raft_types.rs index f07f51bb8..ad483461d 100644 --- a/async-raft/src/raft_types.rs +++ b/async-raft/src/raft_types.rs @@ -60,6 +60,7 @@ pub enum Update { /// E.g. when applying a log to state machine, or installing a state machine from snapshot. #[derive(Debug, Clone, PartialEq, Eq)] pub struct StateMachineChanges { + // TODO(xp): it does not need to be an Option pub last_applied: Option, pub is_snapshot: bool, } diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index 0557f0726..0cad6ee1f 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -237,19 +237,10 @@ where /// Finalize the installation of a snapshot which has finished streaming from the cluster leader. /// - /// Delete all entries in the log through `meta.last_log_id.index`. - /// - /// Write a new snapshot pointer to the log at the given `meta.last_log_id.index`. The snapshot pointer should be - /// constructed via the `Entry::new_snapshot_pointer` constructor and the other parameters - /// provided to this method. - /// /// All other snapshots should be deleted at this point. /// /// ### snapshot - /// A snapshot created from an earlier call to `created_snapshot` which provided the snapshot. - /// By the time ownership of the snapshot object is returned here, its - /// `AsyncWriteExt.shutdown()` method will have been called, so no additional writes should be - /// made to the snapshot. + /// A snapshot created from an earlier call to `begin_receiving_snapshot` which provided the snapshot. /// /// Errors returned from this method will cause Raft to go into shutdown. async fn finalize_snapshot_installation( diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 858e53025..75793351c 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -802,15 +802,6 @@ impl RaftStorage for MemStore { tracing::debug!("JSON SNAP DATA:{}", y); } - // Update log. - { - let mut log = self.log.write().await; - - // Remove logs that are included in the snapshot. - // Leave at least one log or the replication can not find out the mismatched log. - *log = log.split_off(&meta.last_log_id.index); - } - // Update the state machine. { let new_sm: MemStoreStateMachine = serde_json::from_slice(&new_snapshot.data).map_err(|e| { diff --git a/memstore/src/test.rs b/memstore/src/test.rs index d305cfc20..464f3d7ef 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -120,6 +120,8 @@ where run_fut(Suite::apply_single(builder))?; run_fut(Suite::apply_multi(builder))?; + // TODO(xp): test: finalized_snapshot, do_log_compaction, begin_receiving_snapshot, get_current_snapshot + Ok(()) }