diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index 1fe9a2393..a0e170f64 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -51,33 +51,19 @@ use crate::raft::RaftRespTx; use crate::raft_types::LogIdOptionExt; use crate::replication::ReplicaEvent; use crate::replication::ReplicationStream; -use crate::storage::HardState; -use crate::AppData; -use crate::AppDataResponse; -use crate::LogId; -use crate::Membership; +use crate::types::v070::AppData; +use crate::types::v070::AppDataResponse; +use crate::types::v070::EffectiveMembership; +use crate::types::v070::HardState; +use crate::types::v070::LogId; +use crate::types::v070::Membership; +use crate::types::v070::NodeId; +use crate::types::v070::RaftNetwork; +use crate::types::v070::RaftStorage; +use crate::types::v070::StorageError; use crate::MessageSummary; -use crate::NodeId; -use crate::RaftNetwork; -use crate::RaftStorage; -use crate::StorageError; use crate::Update; -/// The currently active membership config. -/// -/// It includes: -/// - the id of the log that sets this membership config, -/// - and the config. -/// -/// An active config is just the last seen config in raft spec. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct EffectiveMembership { - /// The id of the log that applies this membership config - pub log_id: LogId, - - pub membership: Membership, -} - impl EffectiveMembership { pub fn new_initial(node_id: u64) -> Self { EffectiveMembership { diff --git a/openraft/src/core/replication.rs b/openraft/src/core/replication.rs index 0254eee68..21032ce3a 100644 --- a/openraft/src/core/replication.rs +++ b/openraft/src/core/replication.rs @@ -14,16 +14,16 @@ use crate::raft::RaftRespTx; use crate::replication::RaftEvent; use crate::replication::ReplicaEvent; use crate::replication::ReplicationStream; -use crate::storage::Snapshot; use crate::summary::MessageSummary; -use crate::AppData; -use crate::AppDataResponse; -use crate::LogId; -use crate::NodeId; -use crate::RaftNetwork; -use crate::RaftStorage; +use crate::types::v070::AppData; +use crate::types::v070::AppDataResponse; +use crate::types::v070::LogId; +use crate::types::v070::NodeId; +use crate::types::v070::RaftNetwork; +use crate::types::v070::RaftStorage; +use crate::types::v070::Snapshot; +use crate::types::v070::StorageError; use crate::ReplicationMetrics; -use crate::StorageError; impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> LeaderState<'a, D, R, N, S> { /// Spawn a new replication stream returning its replication state handle. diff --git a/openraft/src/defensive.rs b/openraft/src/defensive.rs index 5f3a0731e..8696eb72f 100644 --- a/openraft/src/defensive.rs +++ b/openraft/src/defensive.rs @@ -6,11 +6,11 @@ use async_trait::async_trait; use crate::raft::Entry; use crate::raft_types::LogIdOptionExt; -use crate::storage::HardState; use crate::AppData; use crate::AppDataResponse; use crate::DefensiveError; use crate::ErrorSubject; +use crate::HardState; use crate::LogId; use crate::RaftStorage; use crate::StorageError; diff --git a/openraft/src/error.rs b/openraft/src/error.rs index dedcbb14a..cb4009e5b 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -7,9 +7,9 @@ use std::time::Duration; use serde::Deserialize; use serde::Serialize; -use crate::raft_types::SnapshotSegmentId; use crate::LogId; use crate::NodeId; +use crate::SnapshotSegmentId; use crate::StorageError; /// Fatal is unrecoverable and shuts down raft at once. diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index ba9eb532b..46d5313a5 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -18,68 +18,50 @@ pub mod network; pub mod raft; pub mod storage; pub mod testing; +pub mod types; #[cfg(test)] mod metrics_wait_test; pub use async_trait; -use serde::de::DeserializeOwned; -use serde::Serialize; pub use crate::config::Config; pub use crate::config::ConfigError; pub use crate::config::SnapshotPolicy; -pub use crate::core::EffectiveMembership; pub use crate::core::State; pub use crate::defensive::DefensiveCheck; -pub use crate::membership::Membership; pub use crate::metrics::RaftMetrics; -pub use crate::network::RaftNetwork; pub use crate::raft::Raft; -pub use crate::raft_types::LogId; pub use crate::raft_types::LogIdOptionExt; -pub use crate::raft_types::SnapshotId; -pub use crate::raft_types::SnapshotSegmentId; -pub use crate::raft_types::StateMachineChanges; pub use crate::raft_types::Update; pub use crate::replication::ReplicationMetrics; -pub use crate::storage::RaftStorage; -pub use crate::storage::RaftStorageDebug; -pub use crate::storage::SnapshotMeta; -pub use crate::storage_error::DefensiveError; -pub use crate::storage_error::ErrorSubject; -pub use crate::storage_error::ErrorVerb; -pub use crate::storage_error::StorageError; -pub use crate::storage_error::StorageIOError; -pub use crate::storage_error::Violation; pub use crate::store_ext::StoreExt; pub use crate::store_wrapper::Wrapper; pub use crate::summary::MessageSummary; - -/// A Raft node's ID. -pub type NodeId = u64; - -/// A trait defining application specific data. -/// -/// The intention of this trait is that applications which are using this crate will be able to -/// use their own concrete data types throughout their application without having to serialize and -/// deserialize their data as it goes through Raft. Instead, applications can present their data -/// models as-is to Raft, Raft will present it to the application's `RaftStorage` impl when ready, -/// and the application may then deal with the data directly in the storage engine without having -/// to do a preliminary deserialization. -pub trait AppData: Clone + Send + Sync + Serialize + DeserializeOwned + 'static {} - -/// A trait defining application specific response data. -/// -/// The intention of this trait is that applications which are using this crate will be able to -/// use their own concrete data types for returning response data from the storage layer when an -/// entry is applied to the state machine as part of a client request (this is not used during -/// replication). This allows applications to seamlessly return application specific data from -/// their storage layer, up through Raft, and back into their application for returning -/// data to clients. -/// -/// This type must encapsulate both success and error responses, as application specific logic -/// related to the success or failure of a client request — application specific validation logic, -/// enforcing of data constraints, and anything of that nature — are expressly out of the realm of -/// the Raft consensus protocol. -pub trait AppDataResponse: Clone + Send + Sync + Serialize + DeserializeOwned + 'static {} +pub use crate::types::v070::AppData; +pub use crate::types::v070::AppDataResponse; +pub use crate::types::v070::AppendEntriesRequest; +pub use crate::types::v070::AppendEntriesResponse; +pub use crate::types::v070::DefensiveError; +pub use crate::types::v070::EffectiveMembership; +pub use crate::types::v070::Entry; +pub use crate::types::v070::EntryPayload; +pub use crate::types::v070::ErrorSubject; +pub use crate::types::v070::ErrorVerb; +pub use crate::types::v070::HardState; +pub use crate::types::v070::InitialState; +pub use crate::types::v070::LogId; +pub use crate::types::v070::LogState; +pub use crate::types::v070::Membership; +pub use crate::types::v070::NodeId; +pub use crate::types::v070::RaftNetwork; +pub use crate::types::v070::RaftStorage; +pub use crate::types::v070::RaftStorageDebug; +pub use crate::types::v070::Snapshot; +pub use crate::types::v070::SnapshotId; +pub use crate::types::v070::SnapshotMeta; +pub use crate::types::v070::SnapshotSegmentId; +pub use crate::types::v070::StateMachineChanges; +pub use crate::types::v070::StorageError; +pub use crate::types::v070::StorageIOError; +pub use crate::types::v070::Violation; diff --git a/openraft/src/membership/membership.rs b/openraft/src/membership/membership.rs index 69aa95e03..fdfe828c4 100644 --- a/openraft/src/membership/membership.rs +++ b/openraft/src/membership/membership.rs @@ -6,26 +6,12 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use maplit::btreeset; -use serde::Deserialize; -use serde::Serialize; use crate::membership::quorum; +use crate::Membership; use crate::MessageSummary; use crate::NodeId; -/// The membership configuration of the cluster. -/// -/// It could be a joint of one, two or more configs, i.e., a quorum is a node set that is superset of a majority of -/// every config. -#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct Membership { - /// Multi configs. - configs: Vec>, - - /// Cache of all node ids. - all_nodes: BTreeSet, -} - impl MessageSummary for Membership { fn summary(&self) -> String { let mut res = vec!["[".to_string()]; diff --git a/openraft/src/membership/mod.rs b/openraft/src/membership/mod.rs index fed2d03fe..bb559479c 100644 --- a/openraft/src/membership/mod.rs +++ b/openraft/src/membership/mod.rs @@ -5,5 +5,3 @@ mod membership; mod membership_test; pub mod quorum; - -pub use membership::Membership; diff --git a/openraft/src/metrics.rs b/openraft/src/metrics.rs index 757f0c811..2d18d9e74 100644 --- a/openraft/src/metrics.rs +++ b/openraft/src/metrics.rs @@ -17,10 +17,10 @@ use tokio::sync::watch; use tokio::time::Duration; use tokio::time::Instant; -use crate::core::EffectiveMembership; use crate::core::State; use crate::error::Fatal; use crate::raft_types::LogIdOptionExt; +use crate::EffectiveMembership; use crate::LogId; use crate::Membership; use crate::MessageSummary; diff --git a/openraft/src/metrics_wait_test.rs b/openraft/src/metrics_wait_test.rs index 1a688a71c..a29753665 100644 --- a/openraft/src/metrics_wait_test.rs +++ b/openraft/src/metrics_wait_test.rs @@ -4,10 +4,10 @@ use maplit::btreeset; use tokio::sync::watch; use tokio::time::sleep; -use crate::core::EffectiveMembership; use crate::metrics::Wait; use crate::metrics::WaitError; use crate::raft_types::LogIdOptionExt; +use crate::types::v070::EffectiveMembership; use crate::LogId; use crate::Membership; use crate::RaftMetrics; diff --git a/openraft/src/network.rs b/openraft/src/network.rs index 26f1406b6..8b1378917 100644 --- a/openraft/src/network.rs +++ b/openraft/src/network.rs @@ -1,35 +1 @@ -//! The Raft network interface. -use anyhow::Result; -use async_trait::async_trait; - -use crate::raft::AppendEntriesRequest; -use crate::raft::AppendEntriesResponse; -use crate::raft::InstallSnapshotRequest; -use crate::raft::InstallSnapshotResponse; -use crate::raft::VoteRequest; -use crate::raft::VoteResponse; -use crate::AppData; -use crate::NodeId; - -/// A trait defining the interface for a Raft network between cluster members. -/// -/// See the [network chapter of the guide](https://datafuselabs.github.io/openraft/network.html) -/// for details and discussion on this trait and how to implement it. -#[async_trait] -pub trait RaftNetwork: Send + Sync + 'static -where D: AppData -{ - /// Send an AppendEntries RPC to the target Raft node (§5). - async fn send_append_entries(&self, target: NodeId, rpc: AppendEntriesRequest) -> Result; - - /// Send an InstallSnapshot RPC to the target Raft node (§7). - async fn send_install_snapshot( - &self, - target: NodeId, - rpc: InstallSnapshotRequest, - ) -> Result; - - /// Send a RequestVote RPC to the target Raft node (§5). - async fn send_vote(&self, target: NodeId, rpc: VoteRequest) -> Result; -} diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 388f2d4fa..bf25c387c 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -1,12 +1,9 @@ //! Public Raft interface and data types. use std::collections::BTreeSet; -use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; -use serde::Deserialize; -use serde::Serialize; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; @@ -26,15 +23,25 @@ use crate::error::InstallSnapshotError; use crate::error::VoteError; use crate::metrics::RaftMetrics; use crate::metrics::Wait; -use crate::AppData; -use crate::AppDataResponse; -use crate::LogId; -use crate::Membership; +pub use crate::types::v070::AddLearnerResponse; +use crate::types::v070::AppData; +use crate::types::v070::AppDataResponse; +pub use crate::types::v070::AppendEntriesRequest; +pub use crate::types::v070::AppendEntriesResponse; +pub use crate::types::v070::ClientWriteRequest; +pub use crate::types::v070::ClientWriteResponse; +pub use crate::types::v070::Entry; +pub use crate::types::v070::EntryPayload; +pub use crate::types::v070::InstallSnapshotRequest; +pub use crate::types::v070::InstallSnapshotResponse; +use crate::types::v070::LogId; +pub use crate::types::v070::Membership; +use crate::types::v070::NodeId; +use crate::types::v070::RaftNetwork; +use crate::types::v070::RaftStorage; +pub use crate::types::v070::VoteRequest; +pub use crate::types::v070::VoteResponse; use crate::MessageSummary; -use crate::NodeId; -use crate::RaftNetwork; -use crate::RaftStorage; -use crate::SnapshotMeta; struct RaftInner, S: RaftStorage> { tx_api: mpsc::UnboundedSender<(RaftMsg, Span)>, @@ -422,11 +429,6 @@ impl, S: RaftStorage> Cl pub(crate) type RaftRespTx = oneshot::Sender>; pub(crate) type RaftRespRx = oneshot::Receiver>; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct AddLearnerResponse { - pub matched: Option, -} - /// A message coming from the Raft API. pub(crate) enum RaftMsg { AppendEntries { @@ -509,28 +511,6 @@ where ////////////////////////////////////////////////////////////////////////////////////////////////// -/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AppendEntriesRequest { - /// The leader's current term. - pub term: u64, - - /// The leader's ID. Useful in redirecting clients. - pub leader_id: u64, - - pub prev_log_id: Option, - - /// The new log entries to store. - /// - /// This may be empty when the leader is sending heartbeats. Entries - /// are batched for efficiency. - #[serde(bound = "D: AppData")] - pub entries: Vec>, - - /// The leader's committed log id. - pub leader_commit: Option, -} - impl MessageSummary for AppendEntriesRequest { fn summary(&self) -> String { format!( @@ -544,16 +524,6 @@ impl MessageSummary for AppendEntriesRequest { } } -/// The response to an `AppendEntriesRequest`. -#[derive(Debug, Serialize, Deserialize)] -pub struct AppendEntriesResponse { - /// The responding node's current term, for leader to update itself. - pub term: u64, - - pub success: bool, - pub conflict: bool, -} - impl MessageSummary for AppendEntriesResponse { fn summary(&self) -> String { format!( @@ -563,16 +533,6 @@ impl MessageSummary for AppendEntriesResponse { } } -/// A Raft log entry. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct Entry { - pub log_id: LogId, - - /// This entry's payload. - #[serde(bound = "D: AppData")] - pub payload: EntryPayload, -} - impl MessageSummary for Entry { fn summary(&self) -> String { format!("{}:{}", self.log_id, self.payload.summary()) @@ -614,19 +574,6 @@ impl MessageSummary for &[&Entry] { } } -/// Log entry payload variants. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum EntryPayload { - /// An empty payload committed by a new cluster leader. - Blank, - - #[serde(bound = "D: AppData")] - Normal(D), - - /// A change-membership log entry. - Membership(Membership), -} - impl MessageSummary for EntryPayload { fn summary(&self) -> String { match self { @@ -639,17 +586,6 @@ impl MessageSummary for EntryPayload { } } -/// An RPC sent by candidates to gather votes (§5.2). -#[derive(Debug, Serialize, Deserialize)] -pub struct VoteRequest { - /// The candidate's current term. - pub term: u64, - - pub candidate_id: u64, - - pub last_log_id: Option, -} - impl MessageSummary for VoteRequest { fn summary(&self) -> String { format!("{}-{}, last_log:{:?}", self.term, self.candidate_id, self.last_log_id) @@ -666,41 +602,6 @@ impl VoteRequest { } } -/// The response to a `VoteRequest`. -#[derive(Debug, Serialize, Deserialize)] -pub struct VoteResponse { - /// The current term of the responding node, for the candidate to update itself. - pub term: u64, - - /// Will be true if the candidate received a vote from the responder. - pub vote_granted: bool, - - /// The last log id stored on the remote voter. - pub last_log_id: Option, -} - -////////////////////////////////////////////////////////////////////////////////////////////////// - -/// An RPC sent by the Raft leader to send chunks of a snapshot to a follower (§7). -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct InstallSnapshotRequest { - /// The leader's current term. - pub term: u64, - /// The leader's ID. Useful in redirecting clients. - pub leader_id: u64, - - /// Metadata of a snapshot: snapshot_id, last_log_ed membership etc. - pub meta: SnapshotMeta, - - /// The byte offset where this chunk of data is positioned in the snapshot file. - pub offset: u64, - /// The raw bytes of the snapshot chunk, starting at `offset`. - pub data: Vec, - - /// Will be `true` if this is the last chunk in the snapshot. - pub done: bool, -} - impl MessageSummary for InstallSnapshotRequest { fn summary(&self) -> String { format!( @@ -715,26 +616,6 @@ impl MessageSummary for InstallSnapshotRequest { } } -/// The response to an `InstallSnapshotRequest`. -#[derive(Debug, Serialize, Deserialize)] -pub struct InstallSnapshotResponse { - /// The receiving node's current term, for leader to update itself. - pub term: u64, -} - -////////////////////////////////////////////////////////////////////////////////////////////////// - -/// An application specific client request to update the state of the system (§5.1). -/// -/// The entry of this payload will be appended to the Raft log and then applied to the Raft state -/// machine according to the Raft protocol. -#[derive(Debug, Serialize, Deserialize)] -pub struct ClientWriteRequest { - /// The application specific contents of this client request. - #[serde(bound = "D: AppData")] - pub(crate) payload: EntryPayload, -} - impl MessageSummary for ClientWriteRequest { fn summary(&self) -> String { self.payload.summary() @@ -747,19 +628,6 @@ impl ClientWriteRequest { } } -/// The response to a `ClientRequest`. -#[derive(Debug, Serialize, Deserialize)] -pub struct ClientWriteResponse { - pub log_id: LogId, - - /// Application specific response data. - #[serde(bound = "R: AppDataResponse")] - pub data: R, - - /// If the log entry is a change-membership entry. - pub membership: Option, -} - impl MessageSummary for ClientWriteResponse { fn summary(&self) -> String { format!("log_id: {}, membership: {:?}", self.log_id, self.membership) diff --git a/openraft/src/raft_types.rs b/openraft/src/raft_types.rs index 40110f63f..bb59eb2c1 100644 --- a/openraft/src/raft_types.rs +++ b/openraft/src/raft_types.rs @@ -4,13 +4,8 @@ use std::fmt::Formatter; use serde::Deserialize; use serde::Serialize; -/// The identity of a raft log. -/// A term and an index identifies an log globally. -#[derive(Debug, Default, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)] -pub struct LogId { - pub term: u64, - pub index: u64, -} +use crate::LogId; +use crate::SnapshotSegmentId; impl From<(u64, u64)> for LogId { fn from(v: (u64, u64)) -> Self { @@ -86,16 +81,6 @@ impl LogIndexOptionExt for Option { } } -// Everytime a snapshot is created, it is assigned with a globally unique id. -pub type SnapshotId = String; - -/// The identity of a segment of a snapshot. -#[derive(Debug, Default, Clone, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] -pub struct SnapshotSegmentId { - pub id: SnapshotId, - pub offset: u64, -} - impl From<(D, u64)> for SnapshotSegmentId { fn from(v: (D, u64)) -> Self { SnapshotSegmentId { @@ -117,11 +102,3 @@ pub enum Update { Update(T), AsIs, } - -/// The changes of a state machine. -/// E.g. when applying a log to state machine, or installing a state machine from snapshot. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct StateMachineChanges { - pub last_applied: LogId, - pub is_snapshot: bool, -} diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index a7b6c6b46..030e8f0dd 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -27,7 +27,6 @@ use crate::raft::AppendEntriesRequest; use crate::raft::InstallSnapshotRequest; use crate::raft_types::LogIdOptionExt; use crate::raft_types::LogIndexOptionExt; -use crate::storage::Snapshot; use crate::AppData; use crate::AppDataResponse; use crate::LogId; @@ -35,6 +34,7 @@ use crate::MessageSummary; use crate::NodeId; use crate::RaftNetwork; use crate::RaftStorage; +use crate::Snapshot; #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ReplicationMetrics { diff --git a/openraft/src/storage.rs b/openraft/src/storage.rs index 310bf83f4..5dd4e7fa2 100644 --- a/openraft/src/storage.rs +++ b/openraft/src/storage.rs @@ -1,328 +1,6 @@ //! The Raft storage interface and data types. -use std::fmt::Debug; -use std::ops::RangeBounds; - -use async_trait::async_trait; -use serde::Deserialize; -use serde::Serialize; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; -use tokio::io::AsyncWrite; - -use crate::core::EffectiveMembership; -use crate::defensive::check_range_matches_entries; -use crate::raft::Entry; -use crate::raft::EntryPayload; -use crate::raft_types::SnapshotId; -use crate::raft_types::StateMachineChanges; -use crate::AppData; -use crate::AppDataResponse; -use crate::LogId; -use crate::LogIdOptionExt; -use crate::NodeId; -use crate::StorageError; - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] -pub struct SnapshotMeta { - // Log entries upto which this snapshot includes, inclusive. - pub last_log_id: LogId, - - /// To identify a snapshot when transferring. - /// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be different in bytes. - pub snapshot_id: SnapshotId, -} - -/// The data associated with the current snapshot. -#[derive(Debug)] -pub struct Snapshot -where S: AsyncRead + AsyncSeek + Send + Unpin + 'static -{ - /// metadata of a snapshot - pub meta: SnapshotMeta, - - /// A read handle to the associated snapshot. - pub snapshot: Box, -} - -/// A record holding the hard state of a Raft node. -/// -/// This model derives serde's traits for easily (de)serializing this -/// model for storage & retrieval. -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)] -pub struct HardState { - /// The last recorded term observed by this system. - pub current_term: u64, - /// The ID of the node voted for in the `current_term`. - pub voted_for: Option, -} - -/// A struct used to represent the initial state which a Raft node needs when first starting. -#[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct InitialState { - /// The last entry. - pub last_log_id: Option, - - /// The LogId of the last log applied to the state machine. - pub last_applied: Option, - - /// The saved hard state of the node. - pub hard_state: HardState, - - /// The latest cluster membership configuration found, in log or in state machine, else a new initial - /// membership config consisting only of this node's ID. - pub last_membership: Option, -} - -/// The state about logs. -/// -/// Invariance: last_purged_log_id <= last_applied <= last_log_id -#[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct LogState { - /// The greatest log id that has been purged after being applied to state machine. - pub last_purged_log_id: Option, - - /// The log id of the last present entry if there are any entries. - /// Otherwise the same value as `last_purged_log_id`. - pub last_log_id: Option, -} - -/// A trait defining the interface for a Raft storage system. -/// -/// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/storage.html) -/// for details and discussion on this trait and how to implement it. -#[async_trait] -pub trait RaftStorage: Send + Sync + 'static -where - D: AppData, - R: AppDataResponse, -{ - // TODO(xp): simplify storage API - - /// The storage engine's associated type used for exposing a snapshot for reading & writing. - /// - /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) - /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; - - /// Returns the last membership config found in log or state machine. - async fn get_membership(&self) -> Result, StorageError> { - let (_, sm_mem) = self.last_applied_state().await?; - - let sm_mem_index = match &sm_mem { - None => 0, - Some(mem) => mem.log_id.index, - }; - - let log_mem = self.last_membership_in_log(sm_mem_index + 1).await?; - - if log_mem.is_some() { - return Ok(log_mem); - } - - return Ok(sm_mem); - } - - /// Get the latest membership config found in the log. - /// - /// This method should returns membership with the greatest log index which is `>=since_index`. - /// If no such membership log is found, it returns `None`, e.g., when logs are cleaned after being applied. - #[tracing::instrument(level = "trace", skip(self))] - async fn last_membership_in_log(&self, since_index: u64) -> Result, StorageError> { - let st = self.get_log_state().await?; - - let mut end = st.last_log_id.next_index(); - let start = std::cmp::max(st.last_purged_log_id.next_index(), since_index); - let step = 64; - - while start < end { - let step_start = std::cmp::max(start, end.saturating_sub(step)); - let entries = self.try_get_log_entries(step_start..end).await?; - - for ent in entries.iter().rev() { - if let EntryPayload::Membership(ref mem) = ent.payload { - return Ok(Some(EffectiveMembership { - log_id: ent.log_id, - membership: mem.clone(), - })); - } - } - - end = end.saturating_sub(step); - } - - Ok(None) - } - - /// Get Raft's state information from storage. - /// - /// When the Raft node is first started, it will call this interface to fetch the last known state from stable - /// storage. - async fn get_initial_state(&self) -> Result { - let hs = self.read_hard_state().await?; - let st = self.get_log_state().await?; - let mut last_log_id = st.last_log_id; - let (last_applied, _) = self.last_applied_state().await?; - let membership = self.get_membership().await?; - - // Clean up dirty state: snapshot is installed but logs are not cleaned. - if last_log_id < last_applied { - self.purge_logs_upto(last_applied.unwrap()).await?; - last_log_id = last_applied; - } - - Ok(InitialState { - last_log_id, - last_applied, - hard_state: hs.unwrap_or_default(), - last_membership: membership, - }) - } - - /// Get a series of log entries from storage. - /// - /// Similar to `try_get_log_entries` except an error will be returned if there is an entry not found in the - /// specified range. - async fn get_log_entries + Clone + Debug + Send + Sync>( - &self, - range: RB, - ) -> Result>, StorageError> { - let res = self.try_get_log_entries(range.clone()).await?; - - check_range_matches_entries(range, &res)?; - - Ok(res) - } - - /// Try to get an log entry. - /// - /// It does not return an error if the log entry at `log_index` is not found. - async fn try_get_log_entry(&self, log_index: u64) -> Result>, StorageError> { - let mut res = self.try_get_log_entries(log_index..(log_index + 1)).await?; - Ok(res.pop()) - } - - /// Get the log id of the entry at `index`. - async fn get_log_id(&self, log_index: u64) -> Result { - let st = self.get_log_state().await?; - - if Some(log_index) == st.last_purged_log_id.index() { - return Ok(st.last_purged_log_id.unwrap()); - } - - let entries = self.get_log_entries(log_index..=log_index).await?; - - Ok(entries[0].log_id) - } - - // --- Hard State - - async fn save_hard_state(&self, hs: &HardState) -> Result<(), StorageError>; - - async fn read_hard_state(&self) -> Result, StorageError>; - - // --- Log - - /// Returns the last deleted log id and the last log id. - /// - /// The impl should not consider the applied log id in state machine. - /// The returned `last_log_id` could be the log id of the last present log entry, or the `last_purged_log_id` if - /// there is no entry at all. - async fn get_log_state(&self) -> Result; - - /// Get a series of log entries from storage. - /// - /// The start value is inclusive in the search and the stop value is non-inclusive: `[start, stop)`. - /// - /// Entry that is not found is allowed. - async fn try_get_log_entries + Clone + Debug + Send + Sync>( - &self, - range: RB, - ) -> Result>, StorageError>; - - /// Append a payload of entries to the log. - /// - /// Though the entries will always be presented in order, each entry's index should be used to - /// determine its location to be written in the log. - async fn append_to_log(&self, entries: &[&Entry]) -> Result<(), StorageError>; - - /// Delete conflict log entries since `log_id`, inclusive. - async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError>; - - /// Delete applied log entries upto `log_id`, inclusive. - async fn purge_logs_upto(&self, log_id: LogId) -> Result<(), StorageError>; - - // --- State Machine - - /// Returns the last applied log id which is recorded in state machine, and the last applied membership log id and - /// membership config. - async fn last_applied_state(&self) -> Result<(Option, Option), StorageError>; - - /// Apply the given payload of entries to the state machine. - /// - /// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which - /// have been replicated to a quorum of the cluster, will be applied to the state machine. - /// - /// This is where the business logic of interacting with your application's state machine - /// should live. This is 100% application specific. Perhaps this is where an application - /// specific transaction is being started, or perhaps committed. This may be where a key/value - /// is being stored. - /// - /// An impl should do: - /// - Store the last applied log id. - /// - Deal with the EntryPayload::Normal() log, which is business logic log. - /// - Deal with EntryPayload::Membership, store the membership config. - async fn apply_to_state_machine(&self, entries: &[&Entry]) -> Result, StorageError>; - - // --- Snapshot - - /// Build snapshot - /// - /// A snapshot has to contain information about exactly all logs upto the last applied. - /// - /// Building snapshot can be done by: - /// - Performing log compaction, e.g. merge log entries that operates on the same key, like a LSM-tree does, - /// - or by fetching a snapshot from the state machine. - async fn build_snapshot(&self) -> Result, StorageError>; - - /// Create a new blank snapshot, returning a writable handle to the snapshot object. - /// - /// Raft will use this handle to receive snapshot data. - /// - /// ### implementation guide - /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/storage.html) - /// for details on log compaction / snapshotting. - async fn begin_receiving_snapshot(&self) -> Result, StorageError>; - - /// Install a snapshot which has finished streaming from the cluster leader. - /// - /// All other snapshots should be deleted at this point. - /// - /// ### snapshot - /// A snapshot created from an earlier call to `begin_receiving_snapshot` which provided the snapshot. - async fn install_snapshot( - &self, - meta: &SnapshotMeta, - snapshot: Box, - ) -> Result; - - /// Get a readable handle to the current snapshot, along with its metadata. - /// - /// ### implementation algorithm - /// Implementing this method should be straightforward. Check the configured snapshot - /// directory for any snapshot files. A proper implementation will only ever have one - /// active snapshot, though another may exist while it is being created. As such, it is - /// recommended to use a file naming pattern which will allow for easily distinguishing between - /// the current live snapshot, and any new snapshot which is being created. - /// - /// A proper snapshot implementation will store the term, index and membership config as part - /// of the snapshot, which should be decoded for creating this method's response data. - async fn get_current_snapshot(&self) -> Result>, StorageError>; -} - -/// APIs for debugging a store. -#[async_trait] -pub trait RaftStorageDebug { - /// Get a handle to the state machine for testing purposes. - async fn get_state_machine(&self) -> SM; -} +pub use crate::types::v070::HardState; +pub use crate::types::v070::LogState; +pub use crate::types::v070::RaftStorage; +pub use crate::types::v070::Snapshot; diff --git a/openraft/src/storage_error.rs b/openraft/src/storage_error.rs index 5a2a188dd..de0a67730 100644 --- a/openraft/src/storage_error.rs +++ b/openraft/src/storage_error.rs @@ -1,27 +1,14 @@ use std::backtrace::Backtrace; use std::fmt::Formatter; -use std::ops::Bound; use anyerror::AnyError; -use serde::Deserialize; -use serde::Serialize; -use crate::storage::HardState; -use crate::LogId; -use crate::SnapshotMeta; - -/// An error that occurs when the RaftStore impl runs defensive check of input or output. -/// E.g. re-applying an log entry is a violation that may be a potential bug. -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] -pub struct DefensiveError { - /// The subject that violates store defensive check, e.g. hard-state, log or state machine. - pub subject: ErrorSubject, - - /// The description of the violation. - pub violation: Violation, - - pub backtrace: String, -} +use crate::DefensiveError; +use crate::ErrorSubject; +use crate::ErrorVerb; +use crate::StorageError; +use crate::StorageIOError; +use crate::Violation; impl DefensiveError { pub fn new(subject: ErrorSubject, violation: Violation) -> DefensiveError { @@ -39,114 +26,6 @@ impl std::fmt::Display for DefensiveError { } } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum ErrorSubject { - /// A general storage error - Store, - - /// HardState related error. - HardState, - - /// Error that is happened when operating a series of log entries - Logs, - - /// Error about a single log entry - Log(LogId), - - /// Error about a single log entry without knowing the log term. - LogIndex(u64), - - /// Error happened when applying a log entry - Apply(LogId), - - /// Error happened when operating state machine. - StateMachine, - - /// Error happened when operating snapshot. - Snapshot(SnapshotMeta), - - None, -} - -/// What it is doing when an error occurs. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum ErrorVerb { - Read, - Write, - Seek, - Delete, -} - -/// Violations a store would return when running defensive check. -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] -pub enum Violation { - #[error("term can only be change to a greater value, current: {curr}, change to {to}")] - TermNotAscending { curr: u64, to: u64 }, - - #[error("voted_for can not change from Some() to other Some(), current: {curr:?}, change to {to:?}")] - VotedForChanged { curr: HardState, to: HardState }, - - #[error("log at higher index is obsolete: {higher_index_log_id:?} should GT {lower_index_log_id:?}")] - DirtyLog { - higher_index_log_id: LogId, - lower_index_log_id: LogId, - }, - - #[error("try to get log at index {want} but got {got:?}")] - LogIndexNotFound { want: u64, got: Option }, - - #[error("range is empty: start: {start:?}, end: {end:?}")] - RangeEmpty { start: Option, end: Option }, - - #[error("range is not half-open: start: {start:?}, end: {end:?}")] - RangeNotHalfOpen { start: Bound, end: Bound }, - - // TODO(xp): rename this to some input related error name. - #[error("empty log vector")] - LogsEmpty, - - #[error("all logs are removed. It requires at least one log to track continuity")] - StoreLogsEmpty, - - #[error("logs are not consecutive, prev: {prev:?}, next: {next}")] - LogsNonConsecutive { prev: Option, next: LogId }, - - #[error("invalid next log to apply: prev: {prev:?}, next: {next}")] - ApplyNonConsecutive { prev: Option, next: LogId }, - - #[error("applied log can not conflict, last_applied: {last_applied:?}, delete since: {first_conflict_log_id}")] - AppliedWontConflict { - last_applied: Option, - first_conflict_log_id: LogId, - }, - - #[error("not allowed to purge non-applied logs, last_applied: {last_applied:?}, purge upto: {purge_upto}")] - PurgeNonApplied { - last_applied: Option, - purge_upto: LogId, - }, -} - -/// A storage error could be either a defensive check error or an error occurred when doing the actual io operation. -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] -pub enum StorageError { - /// An error raised by defensive check. - #[error(transparent)] - Defensive { - #[from] - #[backtrace] - source: DefensiveError, - }, - - /// An error raised by io operation. - #[error(transparent)] - IO { - #[from] - #[backtrace] - source: StorageIOError, - }, -} - impl StorageError { pub fn into_defensive(self) -> Option { match self { @@ -168,15 +47,6 @@ impl StorageError { } } -/// Error that occurs when operating the store. -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] -pub struct StorageIOError { - subject: ErrorSubject, - verb: ErrorVerb, - source: AnyError, - backtrace: String, -} - impl std::fmt::Display for StorageIOError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "when {:?} {:?}: {}", self.verb, self.subject, self.source) diff --git a/openraft/src/store_ext.rs b/openraft/src/store_ext.rs index 2ed5e9dfc..d4e4886aa 100644 --- a/openraft/src/store_ext.rs +++ b/openraft/src/store_ext.rs @@ -5,17 +5,17 @@ use std::sync::RwLock; use crate::async_trait::async_trait; use crate::raft::Entry; -use crate::storage::HardState; -use crate::storage::LogState; -use crate::storage::Snapshot; use crate::summary::MessageSummary; +use crate::types::v070::log_state::LogState; use crate::AppData; use crate::AppDataResponse; use crate::DefensiveCheck; use crate::EffectiveMembership; +use crate::HardState; use crate::LogId; use crate::RaftStorage; use crate::RaftStorageDebug; +use crate::Snapshot; use crate::SnapshotMeta; use crate::StateMachineChanges; use crate::StorageError; diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 62af4b506..62442ce63 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -6,16 +6,16 @@ use maplit::btreeset; use crate::raft::Entry; use crate::raft::EntryPayload; -use crate::storage::HardState; -use crate::storage::InitialState; -use crate::storage::LogState; use crate::testing::DefensiveStoreBuilder; use crate::testing::StoreBuilder; +use crate::types::v070::log_state::LogState; use crate::AppData; use crate::AppDataResponse; use crate::DefensiveError; use crate::EffectiveMembership; use crate::ErrorSubject; +use crate::HardState; +use crate::InitialState; use crate::LogId; use crate::Membership; use crate::RaftStorage; diff --git a/openraft/src/types/mod.rs b/openraft/src/types/mod.rs new file mode 100644 index 000000000..f578b7c2b --- /dev/null +++ b/openraft/src/types/mod.rs @@ -0,0 +1 @@ +pub mod v070; diff --git a/openraft/src/types/v070/app.rs b/openraft/src/types/v070/app.rs new file mode 100644 index 000000000..f45d80419 --- /dev/null +++ b/openraft/src/types/v070/app.rs @@ -0,0 +1,30 @@ +use serde::de::DeserializeOwned; +use serde::Serialize; + +/// A Raft node's ID. +pub type NodeId = u64; + +/// A trait defining application specific data. +/// +/// The intention of this trait is that applications which are using this crate will be able to +/// use their own concrete data types throughout their application without having to serialize and +/// deserialize their data as it goes through Raft. Instead, applications can present their data +/// models as-is to Raft, Raft will present it to the application's `RaftStorage` impl when ready, +/// and the application may then deal with the data directly in the storage engine without having +/// to do a preliminary deserialization. +pub trait AppData: Clone + Send + Sync + Serialize + DeserializeOwned + 'static {} + +/// A trait defining application specific response data. +/// +/// The intention of this trait is that applications which are using this crate will be able to +/// use their own concrete data types for returning response data from the storage layer when an +/// entry is applied to the state machine as part of a client request (this is not used during +/// replication). This allows applications to seamlessly return application specific data from +/// their storage layer, up through Raft, and back into their application for returning +/// data to clients. +/// +/// This type must encapsulate both success and error responses, as application specific logic +/// related to the success or failure of a client request — application specific validation logic, +/// enforcing of data constraints, and anything of that nature — are expressly out of the realm of +/// the Raft consensus protocol. +pub trait AppDataResponse: Clone + Send + Sync + Serialize + DeserializeOwned + 'static {} diff --git a/openraft/src/types/v070/effective_membership.rs b/openraft/src/types/v070/effective_membership.rs new file mode 100644 index 000000000..f5142021e --- /dev/null +++ b/openraft/src/types/v070/effective_membership.rs @@ -0,0 +1,20 @@ +use serde::Deserialize; +use serde::Serialize; + +use super::LogId; +use super::Membership; + +/// The currently active membership config. +/// +/// It includes: +/// - the id of the log that sets this membership config, +/// - and the config. +/// +/// An active config is just the last seen config in raft spec. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct EffectiveMembership { + /// The id of the log that applies this membership config + pub log_id: LogId, + + pub membership: Membership, +} diff --git a/openraft/src/types/v070/entry.rs b/openraft/src/types/v070/entry.rs new file mode 100644 index 000000000..e6f4e2582 --- /dev/null +++ b/openraft/src/types/v070/entry.rs @@ -0,0 +1,29 @@ +use serde::Deserialize; +use serde::Serialize; + +use super::AppData; +use super::LogId; +use super::Membership; + +/// A Raft log entry. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Entry { + pub log_id: LogId, + + /// This entry's payload. + #[serde(bound = "D: AppData")] + pub payload: EntryPayload, +} + +/// Log entry payload variants. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum EntryPayload { + /// An empty payload committed by a new cluster leader. + Blank, + + #[serde(bound = "D: AppData")] + Normal(D), + + /// A change-membership log entry. + Membership(Membership), +} diff --git a/openraft/src/types/v070/hard_state.rs b/openraft/src/types/v070/hard_state.rs new file mode 100644 index 000000000..57e08d1f4 --- /dev/null +++ b/openraft/src/types/v070/hard_state.rs @@ -0,0 +1,16 @@ +use serde::Deserialize; +use serde::Serialize; + +use super::NodeId; + +/// A record holding the hard state of a Raft node. +/// +/// This model derives serde's traits for easily (de)serializing this +/// model for storage & retrieval. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)] +pub struct HardState { + /// The last recorded term observed by this system. + pub current_term: u64, + /// The ID of the node voted for in the `current_term`. + pub voted_for: Option, +} diff --git a/openraft/src/types/v070/initial_state.rs b/openraft/src/types/v070/initial_state.rs new file mode 100644 index 000000000..1b90fc3f3 --- /dev/null +++ b/openraft/src/types/v070/initial_state.rs @@ -0,0 +1,20 @@ +use super::EffectiveMembership; +use super::HardState; +use super::LogId; + +/// A struct used to represent the initial state which a Raft node needs when first starting. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct InitialState { + /// The last entry. + pub last_log_id: Option, + + /// The LogId of the last log applied to the state machine. + pub last_applied: Option, + + /// The saved hard state of the node. + pub hard_state: HardState, + + /// The latest cluster membership configuration found, in log or in state machine, else a new initial + /// membership config consisting only of this node's ID. + pub last_membership: Option, +} diff --git a/openraft/src/types/v070/log_id.rs b/openraft/src/types/v070/log_id.rs new file mode 100644 index 000000000..4c0348e14 --- /dev/null +++ b/openraft/src/types/v070/log_id.rs @@ -0,0 +1,10 @@ +use serde::Deserialize; +use serde::Serialize; + +/// The identity of a raft log. +/// A term and an index identifies an log globally. +#[derive(Debug, Default, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)] +pub struct LogId { + pub term: u64, + pub index: u64, +} diff --git a/openraft/src/types/v070/log_state.rs b/openraft/src/types/v070/log_state.rs new file mode 100644 index 000000000..10321e5fd --- /dev/null +++ b/openraft/src/types/v070/log_state.rs @@ -0,0 +1,14 @@ +use super::LogId; + +/// The state about logs. +/// +/// Invariance: last_purged_log_id <= last_applied <= last_log_id +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct LogState { + /// The greatest log id that has been purged after being applied to state machine. + pub last_purged_log_id: Option, + + /// The log id of the last present entry if there are any entries. + /// Otherwise the same value as `last_purged_log_id`. + pub last_log_id: Option, +} diff --git a/openraft/src/types/v070/membership.rs b/openraft/src/types/v070/membership.rs new file mode 100644 index 000000000..6840af3c0 --- /dev/null +++ b/openraft/src/types/v070/membership.rs @@ -0,0 +1,19 @@ +use std::collections::BTreeSet; + +use serde::Deserialize; +use serde::Serialize; + +use super::NodeId; + +/// The membership configuration of the cluster. +/// +/// It could be a joint of one, two or more configs, i.e., a quorum is a node set that is superset of a majority of +/// every config. +#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Membership { + /// Multi configs. + pub(crate) configs: Vec>, + + /// Cache of all node ids. + pub(crate) all_nodes: BTreeSet, +} diff --git a/openraft/src/types/v070/mod.rs b/openraft/src/types/v070/mod.rs new file mode 100644 index 000000000..ca3c760ba --- /dev/null +++ b/openraft/src/types/v070/mod.rs @@ -0,0 +1,49 @@ +pub mod app; +pub mod effective_membership; +pub mod entry; +pub mod hard_state; +pub mod initial_state; +pub mod log_id; +pub mod log_state; +pub mod membership; +pub mod network; +pub mod rpc; +pub mod snapshot; +pub mod state_machine; +pub mod storage; +pub mod storage_error; + +pub use app::AppData; +pub use app::AppDataResponse; +pub use app::NodeId; +pub use effective_membership::EffectiveMembership; +pub use entry::Entry; +pub use entry::EntryPayload; +pub use hard_state::HardState; +pub use initial_state::InitialState; +pub use log_id::LogId; +pub use log_state::LogState; +pub use membership::Membership; +pub use network::RaftNetwork; +pub use rpc::AddLearnerResponse; +pub use rpc::AppendEntriesRequest; +pub use rpc::AppendEntriesResponse; +pub use rpc::ClientWriteRequest; +pub use rpc::ClientWriteResponse; +pub use rpc::InstallSnapshotRequest; +pub use rpc::InstallSnapshotResponse; +pub use rpc::VoteRequest; +pub use rpc::VoteResponse; +pub use snapshot::Snapshot; +pub use snapshot::SnapshotId; +pub use snapshot::SnapshotMeta; +pub use snapshot::SnapshotSegmentId; +pub use state_machine::StateMachineChanges; +pub use storage::RaftStorage; +pub use storage::RaftStorageDebug; +pub use storage_error::DefensiveError; +pub use storage_error::ErrorSubject; +pub use storage_error::ErrorVerb; +pub use storage_error::StorageError; +pub use storage_error::StorageIOError; +pub use storage_error::Violation; diff --git a/openraft/src/types/v070/network.rs b/openraft/src/types/v070/network.rs new file mode 100644 index 000000000..c8269e6a3 --- /dev/null +++ b/openraft/src/types/v070/network.rs @@ -0,0 +1,33 @@ +use anyhow::Result; +use async_trait::async_trait; + +use super::AppData; +use super::AppendEntriesRequest; +use super::AppendEntriesResponse; +use super::InstallSnapshotRequest; +use super::InstallSnapshotResponse; +use super::NodeId; +use super::VoteRequest; +use super::VoteResponse; + +/// A trait defining the interface for a Raft network between cluster members. +/// +/// See the [network chapter of the guide](https://datafuselabs.github.io/openraft/network.html) +/// for details and discussion on this trait and how to implement it. +#[async_trait] +pub trait RaftNetwork: Send + Sync + 'static +where D: AppData +{ + /// Send an AppendEntries RPC to the target Raft node (§5). + async fn send_append_entries(&self, target: NodeId, rpc: AppendEntriesRequest) -> Result; + + /// Send an InstallSnapshot RPC to the target Raft node (§7). + async fn send_install_snapshot( + &self, + target: NodeId, + rpc: InstallSnapshotRequest, + ) -> Result; + + /// Send a RequestVote RPC to the target Raft node (§5). + async fn send_vote(&self, target: NodeId, rpc: VoteRequest) -> Result; +} diff --git a/openraft/src/types/v070/rpc.rs b/openraft/src/types/v070/rpc.rs new file mode 100644 index 000000000..2f76a3ddb --- /dev/null +++ b/openraft/src/types/v070/rpc.rs @@ -0,0 +1,122 @@ +use serde::Deserialize; +use serde::Serialize; + +use super::AppData; +use super::AppDataResponse; +use super::Entry; +use super::EntryPayload; +use super::LogId; +use super::Membership; +use super::SnapshotMeta; + +/// An RPC sent by candidates to gather votes (§5.2). +#[derive(Debug, Serialize, Deserialize)] +pub struct VoteRequest { + /// The candidate's current term. + pub term: u64, + + pub candidate_id: u64, + + pub last_log_id: Option, +} + +/// The response to a `VoteRequest`. +#[derive(Debug, Serialize, Deserialize)] +pub struct VoteResponse { + /// The current term of the responding node, for the candidate to update itself. + pub term: u64, + + /// Will be true if the candidate received a vote from the responder. + pub vote_granted: bool, + + /// The last log id stored on the remote voter. + pub last_log_id: Option, +} + +/// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AppendEntriesRequest { + /// The leader's current term. + pub term: u64, + + /// The leader's ID. Useful in redirecting clients. + pub leader_id: u64, + + pub prev_log_id: Option, + + /// The new log entries to store. + /// + /// This may be empty when the leader is sending heartbeats. Entries + /// are batched for efficiency. + #[serde(bound = "D: AppData")] + pub entries: Vec>, + + /// The leader's committed log id. + pub leader_commit: Option, +} + +/// The response to an `AppendEntriesRequest`. +#[derive(Debug, Serialize, Deserialize)] +pub struct AppendEntriesResponse { + /// The responding node's current term, for leader to update itself. + pub term: u64, + + pub success: bool, + pub conflict: bool, +} + +/// An RPC sent by the Raft leader to send chunks of a snapshot to a follower (§7). +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct InstallSnapshotRequest { + /// The leader's current term. + pub term: u64, + /// The leader's ID. Useful in redirecting clients. + pub leader_id: u64, + + /// Metadata of a snapshot: snapshot_id, last_log_ed membership etc. + pub meta: SnapshotMeta, + + /// The byte offset where this chunk of data is positioned in the snapshot file. + pub offset: u64, + /// The raw bytes of the snapshot chunk, starting at `offset`. + pub data: Vec, + + /// Will be `true` if this is the last chunk in the snapshot. + pub done: bool, +} + +/// The response to an `InstallSnapshotRequest`. +#[derive(Debug, Serialize, Deserialize)] +pub struct InstallSnapshotResponse { + /// The receiving node's current term, for leader to update itself. + pub term: u64, +} + +/// An application specific client request to update the state of the system (§5.1). +/// +/// The entry of this payload will be appended to the Raft log and then applied to the Raft state +/// machine according to the Raft protocol. +#[derive(Debug, Serialize, Deserialize)] +pub struct ClientWriteRequest { + /// The application specific contents of this client request. + #[serde(bound = "D: AppData")] + pub(crate) payload: EntryPayload, +} + +/// The response to a `ClientRequest`. +#[derive(Debug, Serialize, Deserialize)] +pub struct ClientWriteResponse { + pub log_id: LogId, + + /// Application specific response data. + #[serde(bound = "R: AppDataResponse")] + pub data: R, + + /// If the log entry is a change-membership entry. + pub membership: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AddLearnerResponse { + pub matched: Option, +} diff --git a/openraft/src/types/v070/snapshot.rs b/openraft/src/types/v070/snapshot.rs new file mode 100644 index 000000000..0fcb93174 --- /dev/null +++ b/openraft/src/types/v070/snapshot.rs @@ -0,0 +1,38 @@ +use serde::Deserialize; +use serde::Serialize; +use tokio::io::AsyncRead; +use tokio::io::AsyncSeek; + +use super::LogId; + +// Everytime a snapshot is created, it is assigned with a globally unique id. +pub type SnapshotId = String; + +/// The identity of a segment of a snapshot. +#[derive(Debug, Default, Clone, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] +pub struct SnapshotSegmentId { + pub id: SnapshotId, + pub offset: u64, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct SnapshotMeta { + // Log entries upto which this snapshot includes, inclusive. + pub last_log_id: LogId, + + /// To identify a snapshot when transferring. + /// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be different in bytes. + pub snapshot_id: SnapshotId, +} + +/// The data associated with the current snapshot. +#[derive(Debug)] +pub struct Snapshot +where S: AsyncRead + AsyncSeek + Send + Unpin + 'static +{ + /// metadata of a snapshot + pub meta: SnapshotMeta, + + /// A read handle to the associated snapshot. + pub snapshot: Box, +} diff --git a/openraft/src/types/v070/state_machine.rs b/openraft/src/types/v070/state_machine.rs new file mode 100644 index 000000000..fc6c5fed7 --- /dev/null +++ b/openraft/src/types/v070/state_machine.rs @@ -0,0 +1,9 @@ +use super::LogId; + +/// The changes of a state machine. +/// E.g. when applying a log to state machine, or installing a state machine from snapshot. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StateMachineChanges { + pub last_applied: LogId, + pub is_snapshot: bool, +} diff --git a/openraft/src/types/v070/storage.rs b/openraft/src/types/v070/storage.rs new file mode 100644 index 000000000..9506306fb --- /dev/null +++ b/openraft/src/types/v070/storage.rs @@ -0,0 +1,263 @@ +//! The Raft storage interface and data types. + +use std::fmt::Debug; +use std::ops::RangeBounds; + +use async_trait::async_trait; +use tokio::io::AsyncRead; +use tokio::io::AsyncSeek; +use tokio::io::AsyncWrite; + +use super::log_state::LogState; +use super::AppData; +use super::AppDataResponse; +use super::EffectiveMembership; +use super::Entry; +use super::EntryPayload; +use super::HardState; +use super::InitialState; +use super::LogId; +use super::Snapshot; +use super::SnapshotMeta; +use super::StateMachineChanges; +use super::StorageError; +use crate::defensive::check_range_matches_entries; +use crate::LogIdOptionExt; + +/// A trait defining the interface for a Raft storage system. +/// +/// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/storage.html) +/// for details and discussion on this trait and how to implement it. +#[async_trait] +pub trait RaftStorage: Send + Sync + 'static +where + D: AppData, + R: AppDataResponse, +{ + /// The storage engine's associated type used for exposing a snapshot for reading & writing. + /// + /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) + /// for details on where and how this is used. + type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; + + /// Returns the last membership config found in log or state machine. + async fn get_membership(&self) -> Result, StorageError> { + let (_, sm_mem) = self.last_applied_state().await?; + + let sm_mem_index = match &sm_mem { + None => 0, + Some(mem) => mem.log_id.index, + }; + + let log_mem = self.last_membership_in_log(sm_mem_index + 1).await?; + + if log_mem.is_some() { + return Ok(log_mem); + } + + return Ok(sm_mem); + } + + /// Get the latest membership config found in the log. + /// + /// This method should returns membership with the greatest log index which is `>=since_index`. + /// If no such membership log is found, it returns `None`, e.g., when logs are cleaned after being applied. + #[tracing::instrument(level = "trace", skip(self))] + async fn last_membership_in_log(&self, since_index: u64) -> Result, StorageError> { + let st = self.get_log_state().await?; + + let mut end = st.last_log_id.next_index(); + let start = std::cmp::max(st.last_purged_log_id.next_index(), since_index); + let step = 64; + + while start < end { + let step_start = std::cmp::max(start, end.saturating_sub(step)); + let entries = self.try_get_log_entries(step_start..end).await?; + + for ent in entries.iter().rev() { + if let EntryPayload::Membership(ref mem) = ent.payload { + return Ok(Some(EffectiveMembership { + log_id: ent.log_id, + membership: mem.clone(), + })); + } + } + + end = end.saturating_sub(step); + } + + Ok(None) + } + + /// Get Raft's state information from storage. + /// + /// When the Raft node is first started, it will call this interface to fetch the last known state from stable + /// storage. + async fn get_initial_state(&self) -> Result { + let hs = self.read_hard_state().await?; + let st = self.get_log_state().await?; + let mut last_log_id = st.last_log_id; + let (last_applied, _) = self.last_applied_state().await?; + let membership = self.get_membership().await?; + + // Clean up dirty state: snapshot is installed but logs are not cleaned. + if last_log_id < last_applied { + self.purge_logs_upto(last_applied.unwrap()).await?; + last_log_id = last_applied; + } + + Ok(InitialState { + last_log_id, + last_applied, + hard_state: hs.unwrap_or_default(), + last_membership: membership, + }) + } + + /// Get a series of log entries from storage. + /// + /// Similar to `try_get_log_entries` except an error will be returned if there is an entry not found in the + /// specified range. + async fn get_log_entries + Clone + Debug + Send + Sync>( + &self, + range: RB, + ) -> Result>, StorageError> { + let res = self.try_get_log_entries(range.clone()).await?; + + check_range_matches_entries(range, &res)?; + + Ok(res) + } + + /// Try to get an log entry. + /// + /// It does not return an error if the log entry at `log_index` is not found. + async fn try_get_log_entry(&self, log_index: u64) -> Result>, StorageError> { + let mut res = self.try_get_log_entries(log_index..(log_index + 1)).await?; + Ok(res.pop()) + } + + /// Get the log id of the entry at `index`. + async fn get_log_id(&self, log_index: u64) -> Result { + let st = self.get_log_state().await?; + + if Some(log_index) == st.last_purged_log_id.index() { + return Ok(st.last_purged_log_id.unwrap()); + } + + let entries = self.get_log_entries(log_index..=log_index).await?; + + Ok(entries[0].log_id) + } + + // --- Hard State + + async fn save_hard_state(&self, hs: &HardState) -> Result<(), StorageError>; + + async fn read_hard_state(&self) -> Result, StorageError>; + + // --- Log + + /// Returns the last deleted log id and the last log id. + /// + /// The impl should not consider the applied log id in state machine. + /// The returned `last_log_id` could be the log id of the last present log entry, or the `last_purged_log_id` if + /// there is no entry at all. + async fn get_log_state(&self) -> Result; + + /// Get a series of log entries from storage. + /// + /// The start value is inclusive in the search and the stop value is non-inclusive: `[start, stop)`. + /// + /// Entry that is not found is allowed. + async fn try_get_log_entries + Clone + Debug + Send + Sync>( + &self, + range: RB, + ) -> Result>, StorageError>; + + /// Append a payload of entries to the log. + /// + /// Though the entries will always be presented in order, each entry's index should be used to + /// determine its location to be written in the log. + async fn append_to_log(&self, entries: &[&Entry]) -> Result<(), StorageError>; + + /// Delete conflict log entries since `log_id`, inclusive. + async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError>; + + /// Delete applied log entries upto `log_id`, inclusive. + async fn purge_logs_upto(&self, log_id: LogId) -> Result<(), StorageError>; + + // --- State Machine + + /// Returns the last applied log id which is recorded in state machine, and the last applied membership log id and + /// membership config. + async fn last_applied_state(&self) -> Result<(Option, Option), StorageError>; + + /// Apply the given payload of entries to the state machine. + /// + /// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which + /// have been replicated to a quorum of the cluster, will be applied to the state machine. + /// + /// This is where the business logic of interacting with your application's state machine + /// should live. This is 100% application specific. Perhaps this is where an application + /// specific transaction is being started, or perhaps committed. This may be where a key/value + /// is being stored. + /// + /// An impl should do: + /// - Store the last applied log id. + /// - Deal with the EntryPayload::Normal() log, which is business logic log. + /// - Deal with EntryPayload::Membership, store the membership config. + async fn apply_to_state_machine(&self, entries: &[&Entry]) -> Result, StorageError>; + + // --- Snapshot + + /// Build snapshot + /// + /// A snapshot has to contain information about exactly all logs upto the last applied. + /// + /// Building snapshot can be done by: + /// - Performing log compaction, e.g. merge log entries that operates on the same key, like a LSM-tree does, + /// - or by fetching a snapshot from the state machine. + async fn build_snapshot(&self) -> Result, StorageError>; + + /// Create a new blank snapshot, returning a writable handle to the snapshot object. + /// + /// Raft will use this handle to receive snapshot data. + /// + /// ### implementation guide + /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/storage.html) + /// for details on log compaction / snapshotting. + async fn begin_receiving_snapshot(&self) -> Result, StorageError>; + + /// Install a snapshot which has finished streaming from the cluster leader. + /// + /// All other snapshots should be deleted at this point. + /// + /// ### snapshot + /// A snapshot created from an earlier call to `begin_receiving_snapshot` which provided the snapshot. + async fn install_snapshot( + &self, + meta: &SnapshotMeta, + snapshot: Box, + ) -> Result; + + /// Get a readable handle to the current snapshot, along with its metadata. + /// + /// ### implementation algorithm + /// Implementing this method should be straightforward. Check the configured snapshot + /// directory for any snapshot files. A proper implementation will only ever have one + /// active snapshot, though another may exist while it is being created. As such, it is + /// recommended to use a file naming pattern which will allow for easily distinguishing between + /// the current live snapshot, and any new snapshot which is being created. + /// + /// A proper snapshot implementation will store the term, index and membership config as part + /// of the snapshot, which should be decoded for creating this method's response data. + async fn get_current_snapshot(&self) -> Result>, StorageError>; +} + +/// APIs for debugging a store. +#[async_trait] +pub trait RaftStorageDebug { + /// Get a handle to the state machine for testing purposes. + async fn get_state_machine(&self) -> SM; +} diff --git a/openraft/src/types/v070/storage_error.rs b/openraft/src/types/v070/storage_error.rs new file mode 100644 index 000000000..ea8c72e8b --- /dev/null +++ b/openraft/src/types/v070/storage_error.rs @@ -0,0 +1,139 @@ +use std::ops::Bound; + +use anyerror::AnyError; +use serde::Deserialize; +use serde::Serialize; + +use super::HardState; +use super::LogId; +use super::SnapshotMeta; + +/// An error that occurs when the RaftStore impl runs defensive check of input or output. +/// E.g. re-applying an log entry is a violation that may be a potential bug. +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] +pub struct DefensiveError { + /// The subject that violates store defensive check, e.g. hard-state, log or state machine. + pub subject: ErrorSubject, + + /// The description of the violation. + pub violation: Violation, + + pub backtrace: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ErrorSubject { + /// A general storage error + Store, + + /// HardState related error. + HardState, + + /// Error that is happened when operating a series of log entries + Logs, + + /// Error about a single log entry + Log(LogId), + + /// Error about a single log entry without knowing the log term. + LogIndex(u64), + + /// Error happened when applying a log entry + Apply(LogId), + + /// Error happened when operating state machine. + StateMachine, + + /// Error happened when operating snapshot. + Snapshot(SnapshotMeta), + + None, +} + +/// What it is doing when an error occurs. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ErrorVerb { + Read, + Write, + Seek, + Delete, +} + +/// Violations a store would return when running defensive check. +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] +pub enum Violation { + #[error("term can only be change to a greater value, current: {curr}, change to {to}")] + TermNotAscending { curr: u64, to: u64 }, + + #[error("voted_for can not change from Some() to other Some(), current: {curr:?}, change to {to:?}")] + VotedForChanged { curr: HardState, to: HardState }, + + #[error("log at higher index is obsolete: {higher_index_log_id:?} should GT {lower_index_log_id:?}")] + DirtyLog { + higher_index_log_id: LogId, + lower_index_log_id: LogId, + }, + + #[error("try to get log at index {want} but got {got:?}")] + LogIndexNotFound { want: u64, got: Option }, + + #[error("range is empty: start: {start:?}, end: {end:?}")] + RangeEmpty { start: Option, end: Option }, + + #[error("range is not half-open: start: {start:?}, end: {end:?}")] + RangeNotHalfOpen { start: Bound, end: Bound }, + + // TODO(xp): rename this to some input related error name. + #[error("empty log vector")] + LogsEmpty, + + #[error("all logs are removed. It requires at least one log to track continuity")] + StoreLogsEmpty, + + #[error("logs are not consecutive, prev: {prev:?}, next: {next}")] + LogsNonConsecutive { prev: Option, next: LogId }, + + #[error("invalid next log to apply: prev: {prev:?}, next: {next}")] + ApplyNonConsecutive { prev: Option, next: LogId }, + + #[error("applied log can not conflict, last_applied: {last_applied:?}, delete since: {first_conflict_log_id}")] + AppliedWontConflict { + last_applied: Option, + first_conflict_log_id: LogId, + }, + + #[error("not allowed to purge non-applied logs, last_applied: {last_applied:?}, purge upto: {purge_upto}")] + PurgeNonApplied { + last_applied: Option, + purge_upto: LogId, + }, +} + +/// A storage error could be either a defensive check error or an error occurred when doing the actual io operation. +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] +pub enum StorageError { + /// An error raised by defensive check. + #[error(transparent)] + Defensive { + #[from] + #[backtrace] + source: DefensiveError, + }, + + /// An error raised by io operation. + #[error(transparent)] + IO { + #[from] + #[backtrace] + source: StorageIOError, + }, +} + +/// Error that occurs when operating the store. +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] +pub struct StorageIOError { + pub(crate) subject: ErrorSubject, + pub(crate) verb: ErrorVerb, + pub(crate) source: AnyError, + pub(crate) backtrace: String, +}