diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 9f0a5dfd7..eeea3a82e 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -15,7 +15,7 @@ use crate::quorum; use crate::replication::RaftEvent; use crate::replication::ReplicaEvent; use crate::replication::ReplicationStream; -use crate::storage::CurrentSnapshotData; +use crate::storage::Snapshot; use crate::AppData; use crate::AppDataResponse; use crate::LogId; @@ -277,7 +277,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage async fn handle_needs_snapshot( &mut self, _: NodeId, - tx: oneshot::Sender>, + tx: oneshot::Sender>, ) -> RaftResult<()> { // Ensure snapshotting is configured, else do nothing. let threshold = match &self.core.config.snapshot_policy { diff --git a/async-raft/src/replication/mod.rs b/async-raft/src/replication/mod.rs index 623d02800..3fbeea51f 100644 --- a/async-raft/src/replication/mod.rs +++ b/async-raft/src/replication/mod.rs @@ -26,7 +26,7 @@ use crate::error::RaftResult; use crate::raft::AppendEntriesRequest; use crate::raft::Entry; use crate::raft::InstallSnapshotRequest; -use crate::storage::CurrentSnapshotData; +use crate::storage::Snapshot; use crate::AppData; use crate::AppDataResponse; use crate::LogId; @@ -566,7 +566,7 @@ where S: AsyncRead + AsyncSeek + Send + Unpin + 'static /// The ID of the target node from which the event was sent. target: NodeId, /// The response channel for delivering the snapshot data. - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, /// Some critical error has taken place, and Raft needs to shutdown. Shutdown, @@ -801,8 +801,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage struct SnapshottingState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> { /// An exclusive handle to the replication core. replication_core: &'a mut ReplicationCore, - snapshot: Option>, - snapshot_fetch_rx: Option>>, + snapshot: Option>, + snapshot_fetch_rx: Option>>, } impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> SnapshottingState<'a, D, R, N, S> { @@ -865,7 +865,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// If an error comes up during processing, this routine should simple be called again after /// issuing a new request to the storage layer. #[tracing::instrument(level = "trace", skip(self, rx))] - async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver>) { + async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver>) { loop { let span = tracing::debug_span!("FFF:wait_for_snapshot"); let _ent = span.enter(); @@ -898,7 +898,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData) -> RaftResult<()> { + async fn stream_snapshot(&mut self, mut snapshot: Snapshot) -> RaftResult<()> { let end = snapshot.snapshot.seek(SeekFrom::End(0)).await?; let mut offset = 0; diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index bd690d2b5..2abb7f49b 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -32,7 +32,7 @@ pub struct SnapshotMeta { } /// The data associated with the current snapshot. -pub struct CurrentSnapshotData +pub struct Snapshot where S: AsyncRead + AsyncSeek + Send + Unpin + 'static { /// metadata of a snapshot @@ -214,7 +214,7 @@ where /// log covered by the snapshot. /// /// Errors returned from this method will be logged and retried. - async fn do_log_compaction(&self) -> Result>; + async fn do_log_compaction(&self) -> Result>; /// Create a new blank snapshot, returning a writable handle to the snapshot object. /// @@ -261,7 +261,7 @@ where /// of the snapshot, which should be decoded for creating this method's response data. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn get_current_snapshot(&self) -> Result>>; + async fn get_current_snapshot(&self) -> Result>>; } /// APIs for debugging a store. diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 925f5cce4..54fddd0d0 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -14,9 +14,9 @@ use async_raft::async_trait::async_trait; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; use async_raft::raft::MembershipConfig; -use async_raft::storage::CurrentSnapshotData; use async_raft::storage::HardState; use async_raft::storage::InitialState; +use async_raft::storage::Snapshot; use async_raft::AppData; use async_raft::AppDataResponse; use async_raft::LogId; @@ -344,7 +344,7 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn do_log_compaction(&self) -> Result> { + async fn do_log_compaction(&self) -> Result> { let (data, last_applied_log); let membership_config; { @@ -390,7 +390,7 @@ impl RaftStorage for MemStore { } // Release log & snapshot write locks. tracing::info!({ snapshot_size = snapshot_size }, "log compaction complete"); - Ok(CurrentSnapshotData { + Ok(Snapshot { meta, snapshot: Box::new(Cursor::new(data)), }) @@ -448,13 +448,13 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&self) -> Result>> { + async fn get_current_snapshot(&self) -> Result>> { match &*self.current_snapshot.read().await { Some(snapshot) => { // TODO(xp): try not to clone the entire data. // If snapshot.data is Arc that impl AsyncRead etc then the sharing can be done. let data = snapshot.data.clone(); - Ok(Some(CurrentSnapshotData { + Ok(Some(Snapshot { meta: snapshot.meta.clone(), snapshot: Box::new(Cursor::new(data)), }))