diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index 5870c8868..5a255a168 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -208,7 +208,7 @@ impl, S: RaftStorage> Ra if let Some(snapshot) = self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))? { - self.snapshot_last_log_id = snapshot.last_log_id; + self.snapshot_last_log_id = snapshot.meta.last_log_id; self.report_metrics(Update::Ignore); } @@ -452,8 +452,8 @@ impl, S: RaftStorage> Ra match res { Ok(res) => match res { Ok(snapshot) => { - let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.last_log_id)); - let _ = chan_tx.send(snapshot.last_log_id.index); // This will always succeed. + let _ = tx_compaction.try_send(SnapshotUpdate::SnapshotComplete(snapshot.meta.last_log_id)); + let _ = chan_tx.send(snapshot.meta.last_log_id.index); // This will always succeed. } Err(err) => { tracing::error!({error=%err}, "error while generating snapshot"); diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 8daae04e4..9711fd114 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -288,7 +288,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // If snapshot exists, ensure its distance from the leader's last log index is <= half // of the configured snapshot threshold, else create a new snapshot. if snapshot_is_within_half_of_threshold( - &snapshot.last_log_id.index, + &snapshot.meta.last_log_id.index, &self.core.last_log_id.index, &threshold, ) { diff --git a/async-raft/src/lib.rs b/async-raft/src/lib.rs index b0047f284..e1486ac53 100644 --- a/async-raft/src/lib.rs +++ b/async-raft/src/lib.rs @@ -12,7 +12,6 @@ pub mod raft; mod raft_types; mod replication; pub mod storage; - pub use async_trait; use serde::de::DeserializeOwned; use serde::Serialize; @@ -35,6 +34,7 @@ pub use crate::raft_types::SnapshotSegmentId; pub use crate::raft_types::Update; pub use crate::replication::ReplicationMetrics; pub use crate::storage::RaftStorage; +pub use crate::storage::SnapshotMeta; /// A Raft node's ID. pub type NodeId = u64; diff --git a/async-raft/src/replication/mod.rs b/async-raft/src/replication/mod.rs index 10ceec199..71d12028f 100644 --- a/async-raft/src/replication/mod.rs +++ b/async-raft/src/replication/mod.rs @@ -829,10 +829,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage #[tracing::instrument(level = "trace", skip(self, snapshot))] async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData) -> RaftResult<()> { - let snapshot_id = snapshot.snapshot_id.clone(); + let snapshot_id = snapshot.meta.snapshot_id.clone(); let mut offset = 0; - self.core.next_index = snapshot.last_log_id.index + 1; - self.core.matched = snapshot.last_log_id; + self.core.next_index = snapshot.meta.last_log_id.index + 1; + self.core.matched = snapshot.meta.last_log_id; let mut buf = Vec::with_capacity(self.core.config.snapshot_max_chunk_size as usize); loop { // Build the RPC. @@ -843,7 +843,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage term: self.core.term, leader_id: self.core.id, snapshot_id: snapshot_id.clone(), - last_log_id: snapshot.last_log_id, + last_log_id: snapshot.meta.last_log_id, offset, data: Vec::from(&buf[..nread]), done, diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index e906de578..bd5a08c01 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -18,10 +18,8 @@ use crate::AppDataResponse; use crate::LogId; use crate::NodeId; -/// The data associated with the current snapshot. -pub struct CurrentSnapshotData -where S: AsyncRead + AsyncSeek + Send + Unpin + 'static -{ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SnapshotMeta { // Log entries upto which this snapshot includes, inclusive. pub last_log_id: LogId, @@ -29,6 +27,14 @@ where S: AsyncRead + AsyncSeek + Send + Unpin + 'static pub membership: MembershipConfig, pub snapshot_id: SnapshotId, +} + +/// The data associated with the current snapshot. +pub struct CurrentSnapshotData +where S: AsyncRead + AsyncSeek + Send + Unpin + 'static +{ + /// metadata of a snapshot + pub meta: SnapshotMeta, /// A read handle to the associated snapshot. pub snapshot: Box, diff --git a/async-raft/tests/fixtures/mod.rs b/async-raft/tests/fixtures/mod.rs index b2d9aaf88..e1b9b8782 100644 --- a/async-raft/tests/fixtures/mod.rs +++ b/async-raft/tests/fixtures/mod.rs @@ -501,27 +501,27 @@ impl RaftRouter { .unwrap_or_else(|| panic!("no snapshot present for node {}", id)); match index_test { ValueTest::Exact(index) => assert_eq!( - &snap.last_log_id.index, index, + &snap.meta.last_log_id.index, index, "expected node {} to have snapshot with index {}, got {}", - id, index, snap.last_log_id.index + id, index, snap.meta.last_log_id.index ), ValueTest::Range(range) => assert!( - range.contains(&snap.last_log_id.index), + range.contains(&snap.meta.last_log_id.index), "expected node {} to have snapshot within range {:?}, got {}", id, range, - snap.last_log_id.index + snap.meta.last_log_id.index ), } assert_eq!( - &snap.last_log_id.term, term, + &snap.meta.last_log_id.term, term, "expected node {} to have snapshot with term {}, got {}", - id, term, snap.last_log_id.term + id, term, snap.meta.last_log_id.term ); assert_eq!( - &snap.membership, cfg, + &snap.meta.membership, cfg, "expected node {} to have membership config {:?}, got {:?}", - id, cfg, snap.membership + id, cfg, snap.meta.membership ); } let sm = storage.get_state_machine().await; diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 722d6a32b..3b5a20f28 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -22,7 +22,7 @@ use async_raft::AppDataResponse; use async_raft::LogId; use async_raft::NodeId; use async_raft::RaftStorage; -use async_raft::SnapshotId; +use async_raft::SnapshotMeta; use serde::Deserialize; use serde::Serialize; use thiserror::Error; @@ -67,13 +67,8 @@ pub enum ShutdownError { /// The application snapshot type which the `MemStore` works with. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct MemStoreSnapshot { - /// The last log covered by this snapshot. - pub last_log_id: LogId, + pub meta: SnapshotMeta, - /// The last membership config included in this snapshot. - pub membership: MembershipConfig, - - pub snapshot_id: SnapshotId, /// The data of the state machine at the time of this snapshot. pub data: Vec, } @@ -339,12 +334,14 @@ impl RaftStorage for MemStore { snapshot_id = format!("{}-{}-{}", term, last_applied_log, snapshot_idx); let snapshot = MemStoreSnapshot { - last_log_id: LogId { - term, - index: last_applied_log, + meta: SnapshotMeta { + last_log_id: LogId { + term, + index: last_applied_log, + }, + snapshot_id: snapshot_id.clone(), + membership: membership_config.clone(), }, - snapshot_id: snapshot_id.clone(), - membership: membership_config.clone(), data, }; snapshot_bytes = serde_json::to_vec(&snapshot)?; @@ -353,9 +350,11 @@ impl RaftStorage for MemStore { tracing::trace!({ snapshot_size = snapshot_bytes.len() }, "log compaction complete"); Ok(CurrentSnapshotData { - last_log_id: (term, last_applied_log).into(), - membership: membership_config.clone(), - snapshot_id, + meta: SnapshotMeta { + last_log_id: (term, last_applied_log).into(), + membership: membership_config.clone(), + snapshot_id, + }, snapshot: Box::new(Cursor::new(snapshot_bytes)), }) } @@ -435,9 +434,7 @@ impl RaftStorage for MemStore { Some(snapshot) => { let reader = serde_json::to_vec(&snapshot)?; Ok(Some(CurrentSnapshotData { - last_log_id: snapshot.last_log_id, - membership: snapshot.membership.clone(), - snapshot_id: snapshot.snapshot_id.clone(), + meta: snapshot.meta.clone(), snapshot: Box::new(Cursor::new(reader)), })) }