From 92fb310eabe1e96b13538b4e56045cc9b3b8e106 Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 19 Jul 2019 19:45:53 +0800 Subject: [PATCH] support protobuf build by default (#264) And support prost optionally This is a derive work of #262 and #260. This PR uses protobuf-build to generate codes just as #262 does, but the generated codes are put inside target directory instead which is the same as #260. --- .travis.yml | 3 + Cargo.toml | 15 +- README.md | 4 + examples/five_mem_node/main.rs | 4 +- examples/single_mem_node/main.rs | 2 +- harness/Cargo.toml | 7 +- proto/Cargo.toml | 15 +- proto/build.rs | 63 +- proto/src/lib.rs | 11 +- proto/src/prost.rs | 8 - proto/src/prost/eraftpb.rs | 148 --- proto/src/prost/wrapper_eraftpb.rs | 916 ------------------ src/errors.rs | 16 +- src/lib.rs | 12 +- src/raft.rs | 105 +- src/raft_log.rs | 4 +- src/raw_node.rs | 20 +- src/storage.rs | 6 +- src/util.rs | 8 +- .../test_membership_changes.rs | 28 +- tests/integration_cases/test_raft.rs | 55 +- tests/integration_cases/test_raft_paper.rs | 26 +- tests/integration_cases/test_raw_node.rs | 31 +- tests/test_util/mod.rs | 4 +- 24 files changed, 202 insertions(+), 1309 deletions(-) delete mode 100644 proto/src/prost.rs delete mode 100644 proto/src/prost/eraftpb.rs delete mode 100644 proto/src/prost/wrapper_eraftpb.rs diff --git a/.travis.yml b/.travis.yml index f2efb23f3..b0e8cc126 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,10 @@ install: script: - if [[ $TRAVIS_RUST_VERSION == "stable" && $TRAVIS_OS_NAME == "linux" ]]; then cargo fmt -- --check; fi - if [[ $TRAVIS_RUST_VERSION == "stable" && $TRAVIS_OS_NAME == "linux" ]]; then cargo clippy -- -D clippy::all; fi + - if [[ $TRAVIS_RUST_VERSION == "stable" && $TRAVIS_OS_NAME == "linux" ]]; then cargo clippy --no-default-features --features prost-codec -- -D clippy::all; fi - cargo test --all -- --nocapture + # There is a bug in protobuf-build that cached size is not supported yet. + # TODO: - cargo test --no-default-features --features prost-codec -- --nocapture # Validate benches still work. - cargo bench --all -- --test # Because failpoints inject failure in code path, which will affect all concurrently running tests, Hence they need to be synchronized, which make tests slow. diff --git a/Cargo.toml b/Cargo.toml index 4c3979be3..a16f3044e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,21 +16,22 @@ edition = "2018" members = ["proto"] [features] -default = [] +default = ["protobuf-codec"] # Enable failpoints failpoints = ["fail/failpoints"] +protobuf-codec = ["raft-proto/protobuf-codec", "harness/protobuf-codec"] +prost-codec = ["prost-derive", "bytes", "prost", "raft-proto/prost-codec", "harness/prost-codec"] # Make sure to synchronize updates with Harness. [dependencies] log = ">0.2" protobuf = "2" -lazy_static = "1.3.0" -prost = "0.5" -prost-derive = "0.5" -bytes = "0.4.11" +prost = { version = "0.5", optional = true } +prost-derive = { version = "0.5", optional = true } +bytes = { version = "0.4.11", optional = true } slog = "2.2" quick-error = "1.2.2" -raft-proto = { path = "proto" } +raft-proto = { path = "proto", default-features = false } rand = "0.7.0" hashbrown = "0.5" fail = { version = "0.3", optional = true } @@ -42,7 +43,7 @@ slog-envlogger = "2.1.0" [dev-dependencies] criterion = ">0.2.4" lazy_static = "1.0" -harness = { path = "harness" } +harness = { path = "harness", default-features = false } regex = "1.1" slog-async = "2.3.0" diff --git a/README.md b/README.md index d98f8a710..11c56acc3 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,10 @@ A complete Raft model contains 4 essential parts: > Note: This Raft implementation in Rust includes the core Consensus Module only, not the other parts. The core Consensus Module in the Raft crate is customizable, flexible, and resilient. You can directly use the Raft crate, but you will need to build your own Log, State Machine and Transport components. +## Using the raft crate + +You can use raft with either [rust-protobuf](https://github.com/pingcap/rust-protobuf) or [Prost](https://github.com/danburkert/prost) to encode/decode gRPC messages. We use rust-protobuf by default. To use Prost, build (or depend on) Raft using the `prost-codec` feature and without default features. + ## Developing the Raft crate `Raft` is built using the latest version of `stable` Rust, using [the 2018 edition](https://doc.rust-lang.org/edition-guide/rust-2018/). diff --git a/examples/five_mem_node/main.rs b/examples/five_mem_node/main.rs index ce3940a02..cd7365df4 100644 --- a/examples/five_mem_node/main.rs +++ b/examples/five_mem_node/main.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::{str, thread}; -use prost::Message as ProstMsg; +use protobuf::Message as PbMessage; use raft::eraftpb::ConfState; use raft::storage::MemStorage; use raft::{prelude::*, StateRole}; @@ -261,7 +261,7 @@ fn on_ready( if let EntryType::EntryConfChange = entry.get_entry_type() { // For conf change messages, make them effective. let mut cc = ConfChange::default(); - ProstMsg::merge(&mut cc, &entry.data).unwrap(); + cc.merge_from_bytes(&entry.data).unwrap(); let node_id = cc.node_id; match cc.get_change_type() { ConfChangeType::AddNode => raft_group.raft.add_node(node_id).unwrap(), diff --git a/examples/single_mem_node/main.rs b/examples/single_mem_node/main.rs index 1227d06c2..6743bab50 100644 --- a/examples/single_mem_node/main.rs +++ b/examples/single_mem_node/main.rs @@ -171,7 +171,7 @@ fn on_ready(r: &mut RawNode, cbs: &mut HashMap) continue; } - if entry.entry_type() == EntryType::EntryNormal { + if entry.get_entry_type() == EntryType::EntryNormal { if let Some(cb) = cbs.remove(entry.data.get(0).unwrap()) { cb(); } diff --git a/harness/Cargo.toml b/harness/Cargo.toml index 8a811da6d..277781328 100644 --- a/harness/Cargo.toml +++ b/harness/Cargo.toml @@ -11,9 +11,14 @@ description = "A testing harness for Raft." categories = [] edition = "2018" +[features] +default = ["protobuf-codec"] +protobuf-codec = ["raft/protobuf-codec"] +prost-codec = ["raft/prost-codec"] + # Make sure to synchronize updates with Raft. [dependencies] -raft = { path = ".." } +raft = { path = "..", default-features = false } rand = "0.7.0" slog = "2.2" slog-term = "2.4.0" diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 494921dd5..4fde2c1ea 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -13,15 +13,16 @@ categories = ["algorithms", "database-implementations"] build = "build.rs" [features] -default = [] -gen = [] +default = ["protobuf-codec"] +protobuf-codec = ["protobuf-build/protobuf-codec"] +prost-codec = ["prost", "prost-derive", "bytes", "lazy_static", "protobuf-build/prost-codec"] [build-dependencies] -protobuf-build = "0.6" +protobuf-build = { git = "https://github.com/tikv/protobuf-build.git", rev = "54c089895e4bf42481c2af46ebf73295c0e5d6b9", default-features = false } [dependencies] -bytes = "0.4.11" -lazy_static = "1.3.0" -prost = "0.5" -prost-derive = "0.5" +bytes = { version = "0.4.11", optional = true } +lazy_static = { version = "1.3.0", optional = true } +prost = { version = "0.5", optional = true } +prost-derive = { version = "0.5", optional = true } protobuf = "2" diff --git a/proto/build.rs b/proto/build.rs index 7ba53fb4b..cacecca53 100644 --- a/proto/build.rs +++ b/proto/build.rs @@ -1,22 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use protobuf_build::*; -use std::fs::{read_dir, File}; -use std::io::Write; -use std::path::Path; +use std::env; +use std::fs::read_dir; fn main() { - // This build script creates files in the `src` directory. Since that is - // outside Cargo's OUT_DIR it will cause an error when this crate is used - // as a dependency. Therefore, the user must opt-in to regenerating the - // Rust files. - if !cfg!(feature = "gen") { - println!("cargo:rerun-if-changed=build.rs"); - return; - } - - check_protoc_version(); - + let out_dir = format!("{}/protos", env::var("OUT_DIR").unwrap()); let file_names: Vec<_> = read_dir("proto") .expect("Couldn't read proto directory") .filter_map(|e| { @@ -28,47 +17,9 @@ fn main() { } }) .collect(); - - for f in &file_names { - println!("cargo:rerun-if-changed={}", f); - } - - // Generate Prost output. - generate_prost_files(&file_names, "src/prost"); - let mod_names = module_names_for_dir("src/prost"); - generate_wrappers( - &mod_names - .iter() - .map(|m| format!("src/prost/{}.rs", m)) - .collect::>(), - "src/prost", - GenOpt::MUT | GenOpt::HAS | GenOpt::TAKE | GenOpt::CLEAR | GenOpt::MESSAGE, + generate_files( + &["include".to_owned(), "proto".to_owned()], + &file_names, + &out_dir, ); - generate_prost_rs(&mod_names); -} - -fn generate_prost_rs(mod_names: &[String]) { - let mut text = "#![allow(dead_code)]\n\ - #![allow(missing_docs)]\n\ - #![allow(clippy::all)]\n\n" - .to_owned(); - - for mod_name in mod_names { - text.push_str("pub mod "); - text.push_str(mod_name); - text.push_str("{\n"); - text.push_str("include!(\"prost/"); - text.push_str(mod_name); - text.push_str(".rs\");"); - text.push_str("include!(\"prost/wrapper_"); - text.push_str(mod_name); - text.push_str(".rs\");"); - text.push_str("}\n\n"); - } - - let prost_rs = Path::new("src/prost.rs"); - let mut lib = File::create(&prost_rs).expect("Could not create prost.rs"); - lib.write_all(text.as_bytes()) - .expect("Could not write prost.rs"); - rustfmt(prost_rs); } diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 0b2c2d8ee..8f5437e5f 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -1,8 +1,15 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -pub use crate::prost::eraftpb; +pub use crate::protos::eraftpb; -mod prost; +#[allow(dead_code)] +#[allow(unknown_lints)] +#[allow(clippy::all)] +#[allow(renamed_and_removed_lints)] +#[allow(bare_trait_objects)] +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); +} pub mod prelude { pub use crate::eraftpb::{ diff --git a/proto/src/prost.rs b/proto/src/prost.rs deleted file mode 100644 index 085a2ed96..000000000 --- a/proto/src/prost.rs +++ /dev/null @@ -1,8 +0,0 @@ -#![allow(dead_code)] -#![allow(missing_docs)] -#![allow(clippy::all)] - -pub mod eraftpb { - include!("prost/eraftpb.rs"); - include!("prost/wrapper_eraftpb.rs"); -} diff --git a/proto/src/prost/eraftpb.rs b/proto/src/prost/eraftpb.rs deleted file mode 100644 index f8c9174e7..000000000 --- a/proto/src/prost/eraftpb.rs +++ /dev/null @@ -1,148 +0,0 @@ -/// The entry is a type of change that needs to be applied. It contains two data fields. -/// While the fields are built into the model; their usage is determined by the entry_type. -/// -/// For normal entries, the data field should contain the data change that should be applied. -/// The context field can be used for any contextual data that might be relevant to the -/// application of the data. -/// -/// For configuration changes, the data will contain the ConfChange message and the -/// context will provide anything needed to assist the configuration change. The context -/// if for the user to set and use in this case. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Entry { - #[prost(enumeration = "EntryType", tag = "1")] - pub entry_type: i32, - #[prost(uint64, tag = "2")] - pub term: u64, - #[prost(uint64, tag = "3")] - pub index: u64, - #[prost(bytes, tag = "4")] - pub data: std::vec::Vec, - #[prost(bytes, tag = "6")] - pub context: std::vec::Vec, - /// Deprecated! It is kept for backward compatibility. - /// TODO: remove it in the next major release. - #[prost(bool, tag = "5")] - pub sync_log: bool, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct SnapshotMetadata { - #[prost(message, optional, tag = "1")] - pub conf_state: ::std::option::Option, - #[prost(message, optional, tag = "4")] - pub pending_membership_change: ::std::option::Option, - #[prost(uint64, tag = "5")] - pub pending_membership_change_index: u64, - #[prost(uint64, tag = "2")] - pub index: u64, - #[prost(uint64, tag = "3")] - pub term: u64, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Snapshot { - #[prost(bytes, tag = "1")] - pub data: std::vec::Vec, - #[prost(message, optional, tag = "2")] - pub metadata: ::std::option::Option, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Message { - #[prost(enumeration = "MessageType", tag = "1")] - pub msg_type: i32, - #[prost(uint64, tag = "2")] - pub to: u64, - #[prost(uint64, tag = "3")] - pub from: u64, - #[prost(uint64, tag = "4")] - pub term: u64, - #[prost(uint64, tag = "5")] - pub log_term: u64, - #[prost(uint64, tag = "6")] - pub index: u64, - #[prost(message, repeated, tag = "7")] - pub entries: ::std::vec::Vec, - #[prost(uint64, tag = "8")] - pub commit: u64, - #[prost(message, optional, tag = "9")] - pub snapshot: ::std::option::Option, - #[prost(bool, tag = "10")] - pub reject: bool, - #[prost(uint64, tag = "11")] - pub reject_hint: u64, - #[prost(bytes, tag = "12")] - pub context: std::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HardState { - #[prost(uint64, tag = "1")] - pub term: u64, - #[prost(uint64, tag = "2")] - pub vote: u64, - #[prost(uint64, tag = "3")] - pub commit: u64, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ConfState { - #[prost(uint64, repeated, tag = "1")] - pub nodes: ::std::vec::Vec, - #[prost(uint64, repeated, tag = "2")] - pub learners: ::std::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ConfChange { - #[prost(uint64, tag = "1")] - pub id: u64, - #[prost(enumeration = "ConfChangeType", tag = "2")] - pub change_type: i32, - /// Used in `AddNode`, `RemoveNode`, and `AddLearnerNode`. - #[prost(uint64, tag = "3")] - pub node_id: u64, - #[prost(bytes, tag = "4")] - pub context: std::vec::Vec, - /// Used in `BeginMembershipChange` and `FinalizeMembershipChange`. - #[prost(message, optional, tag = "5")] - pub configuration: ::std::option::Option, - /// Used in `BeginMembershipChange` and `FinalizeMembershipChange`. - /// Because `RawNode::apply_conf_change` takes a `ConfChange` instead of an `Entry` we must - /// include this index so it can be known. - #[prost(uint64, tag = "6")] - pub start_index: u64, -} -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum EntryType { - EntryNormal = 0, - EntryConfChange = 1, -} -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum MessageType { - MsgHup = 0, - MsgBeat = 1, - MsgPropose = 2, - MsgAppend = 3, - MsgAppendResponse = 4, - MsgRequestVote = 5, - MsgRequestVoteResponse = 6, - MsgSnapshot = 7, - MsgHeartbeat = 8, - MsgHeartbeatResponse = 9, - MsgUnreachable = 10, - MsgSnapStatus = 11, - MsgCheckQuorum = 12, - MsgTransferLeader = 13, - MsgTimeoutNow = 14, - MsgReadIndex = 15, - MsgReadIndexResp = 16, - MsgRequestPreVote = 17, - MsgRequestPreVoteResponse = 18, -} -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum ConfChangeType { - AddNode = 0, - RemoveNode = 1, - AddLearnerNode = 2, - BeginMembershipChange = 3, - FinalizeMembershipChange = 4, -} diff --git a/proto/src/prost/wrapper_eraftpb.rs b/proto/src/prost/wrapper_eraftpb.rs deleted file mode 100644 index 0a39f3097..000000000 --- a/proto/src/prost/wrapper_eraftpb.rs +++ /dev/null @@ -1,916 +0,0 @@ -// Generated file, please don't edit manually. - -impl Entry { - #[inline] - pub fn default_ref() -> &'static Self { - ::protobuf::Message::default_instance() - } - #[inline] - pub fn clear_entry_type(&mut self) { - self.entry_type = 0 - } - #[inline] - pub fn set_entry_type_(&mut self, v: EntryType) { - self.entry_type = unsafe { ::std::mem::transmute::(v) }; - } - #[inline] - pub fn get_entry_type(&self) -> EntryType { - unsafe { ::std::mem::transmute::(self.entry_type) } - } - #[inline] - pub fn clear_term(&mut self) { - self.term = 0 - } - #[inline] - pub fn clear_index(&mut self) { - self.index = 0 - } - #[inline] - pub fn clear_data(&mut self) { - self.data.clear(); - } - #[inline] - pub fn mut_data(&mut self) -> &mut std::vec::Vec { - &mut self.data - } - #[inline] - pub fn take_data(&mut self) -> std::vec::Vec { - ::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) - } - #[inline] - pub fn clear_context(&mut self) { - self.context.clear(); - } - #[inline] - pub fn mut_context(&mut self) -> &mut std::vec::Vec { - &mut self.context - } - #[inline] - pub fn take_context(&mut self) -> std::vec::Vec { - ::std::mem::replace(&mut self.context, ::std::vec::Vec::new()) - } - #[inline] - pub fn clear_sync_log(&mut self) { - self.sync_log = false - } -} -impl ::protobuf::Clear for Entry { - fn clear(&mut self) { - ::prost::Message::clear(self); - } -} -impl ::protobuf::Message for Entry { - fn compute_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn get_cached_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn as_any(&self) -> &dyn::std::any::Any { - self as &dyn::std::any::Any - } - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - fn new() -> Self { - Self::default() - } - fn default_instance() -> &'static Entry { - ::lazy_static::lazy_static! { - static ref INSTANCE: Entry = Entry::default(); - } - &*INSTANCE - } - fn is_initialized(&self) -> bool { - true - } - fn write_to_with_cached_sizes( - &self, - _os: &mut ::protobuf::CodedOutputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn merge_from( - &mut self, - _is: &mut ::protobuf::CodedInputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - unimplemented!(); - } - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - unimplemented!(); - } - fn write_to_bytes(&self) -> ::protobuf::ProtobufResult> { - let mut buf = Vec::new(); - if ::prost::Message::encode(self, &mut buf).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(buf) - } - fn merge_from_bytes(&mut self, bytes: &[u8]) -> ::protobuf::ProtobufResult<()> { - if ::prost::Message::merge(self, bytes).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(()) - } -} -impl SnapshotMetadata { - #[inline] - pub fn default_ref() -> &'static Self { - ::protobuf::Message::default_instance() - } - #[inline] - pub fn has_conf_state(&self) -> bool { - self.conf_state.is_some() - } - #[inline] - pub fn clear_conf_state(&mut self) { - self.conf_state = ::std::option::Option::None - } - #[inline] - pub fn set_conf_state(&mut self, v: ConfState) { - self.conf_state = ::std::option::Option::Some(v); - } - #[inline] - pub fn get_conf_state(&self) -> &ConfState { - match self.conf_state.as_ref() { - Some(v) => v, - None => ConfState::default_ref(), - } - } - #[inline] - pub fn mut_conf_state(&mut self) -> &mut ConfState { - if self.conf_state.is_none() { - self.conf_state = ::std::option::Option::Some(ConfState::default()); - } - self.conf_state.as_mut().unwrap() - } - #[inline] - pub fn take_conf_state(&mut self) -> ConfState { - self.conf_state.take().unwrap_or_else(ConfState::default) - } - #[inline] - pub fn has_pending_membership_change(&self) -> bool { - self.pending_membership_change.is_some() - } - #[inline] - pub fn clear_pending_membership_change(&mut self) { - self.pending_membership_change = ::std::option::Option::None - } - #[inline] - pub fn set_pending_membership_change(&mut self, v: ConfState) { - self.pending_membership_change = ::std::option::Option::Some(v); - } - #[inline] - pub fn get_pending_membership_change(&self) -> &ConfState { - match self.pending_membership_change.as_ref() { - Some(v) => v, - None => ConfState::default_ref(), - } - } - #[inline] - pub fn mut_pending_membership_change(&mut self) -> &mut ConfState { - if self.pending_membership_change.is_none() { - self.pending_membership_change = ::std::option::Option::Some(ConfState::default()); - } - self.pending_membership_change.as_mut().unwrap() - } - #[inline] - pub fn take_pending_membership_change(&mut self) -> ConfState { - self.pending_membership_change - .take() - .unwrap_or_else(ConfState::default) - } - #[inline] - pub fn clear_pending_membership_change_index(&mut self) { - self.pending_membership_change_index = 0 - } - #[inline] - pub fn clear_index(&mut self) { - self.index = 0 - } - #[inline] - pub fn clear_term(&mut self) { - self.term = 0 - } -} -impl ::protobuf::Clear for SnapshotMetadata { - fn clear(&mut self) { - ::prost::Message::clear(self); - } -} -impl ::protobuf::Message for SnapshotMetadata { - fn compute_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn get_cached_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn as_any(&self) -> &dyn::std::any::Any { - self as &dyn::std::any::Any - } - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - fn new() -> Self { - Self::default() - } - fn default_instance() -> &'static SnapshotMetadata { - ::lazy_static::lazy_static! { - static ref INSTANCE: SnapshotMetadata = SnapshotMetadata::default(); - } - &*INSTANCE - } - fn is_initialized(&self) -> bool { - true - } - fn write_to_with_cached_sizes( - &self, - _os: &mut ::protobuf::CodedOutputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn merge_from( - &mut self, - _is: &mut ::protobuf::CodedInputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - unimplemented!(); - } - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - unimplemented!(); - } - fn write_to_bytes(&self) -> ::protobuf::ProtobufResult> { - let mut buf = Vec::new(); - if ::prost::Message::encode(self, &mut buf).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(buf) - } - fn merge_from_bytes(&mut self, bytes: &[u8]) -> ::protobuf::ProtobufResult<()> { - if ::prost::Message::merge(self, bytes).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(()) - } -} -impl Snapshot { - #[inline] - pub fn default_ref() -> &'static Self { - ::protobuf::Message::default_instance() - } - #[inline] - pub fn clear_data(&mut self) { - self.data.clear(); - } - #[inline] - pub fn mut_data(&mut self) -> &mut std::vec::Vec { - &mut self.data - } - #[inline] - pub fn take_data(&mut self) -> std::vec::Vec { - ::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) - } - #[inline] - pub fn has_metadata(&self) -> bool { - self.metadata.is_some() - } - #[inline] - pub fn clear_metadata(&mut self) { - self.metadata = ::std::option::Option::None - } - #[inline] - pub fn set_metadata(&mut self, v: SnapshotMetadata) { - self.metadata = ::std::option::Option::Some(v); - } - #[inline] - pub fn get_metadata(&self) -> &SnapshotMetadata { - match self.metadata.as_ref() { - Some(v) => v, - None => SnapshotMetadata::default_ref(), - } - } - #[inline] - pub fn mut_metadata(&mut self) -> &mut SnapshotMetadata { - if self.metadata.is_none() { - self.metadata = ::std::option::Option::Some(SnapshotMetadata::default()); - } - self.metadata.as_mut().unwrap() - } - #[inline] - pub fn take_metadata(&mut self) -> SnapshotMetadata { - self.metadata - .take() - .unwrap_or_else(SnapshotMetadata::default) - } -} -impl ::protobuf::Clear for Snapshot { - fn clear(&mut self) { - ::prost::Message::clear(self); - } -} -impl ::protobuf::Message for Snapshot { - fn compute_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn get_cached_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn as_any(&self) -> &dyn::std::any::Any { - self as &dyn::std::any::Any - } - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - fn new() -> Self { - Self::default() - } - fn default_instance() -> &'static Snapshot { - ::lazy_static::lazy_static! { - static ref INSTANCE: Snapshot = Snapshot::default(); - } - &*INSTANCE - } - fn is_initialized(&self) -> bool { - true - } - fn write_to_with_cached_sizes( - &self, - _os: &mut ::protobuf::CodedOutputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn merge_from( - &mut self, - _is: &mut ::protobuf::CodedInputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - unimplemented!(); - } - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - unimplemented!(); - } - fn write_to_bytes(&self) -> ::protobuf::ProtobufResult> { - let mut buf = Vec::new(); - if ::prost::Message::encode(self, &mut buf).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(buf) - } - fn merge_from_bytes(&mut self, bytes: &[u8]) -> ::protobuf::ProtobufResult<()> { - if ::prost::Message::merge(self, bytes).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(()) - } -} -impl Message { - #[inline] - pub fn default_ref() -> &'static Self { - ::protobuf::Message::default_instance() - } - #[inline] - pub fn clear_msg_type(&mut self) { - self.msg_type = 0 - } - #[inline] - pub fn set_msg_type_(&mut self, v: MessageType) { - self.msg_type = unsafe { ::std::mem::transmute::(v) }; - } - #[inline] - pub fn get_msg_type(&self) -> MessageType { - unsafe { ::std::mem::transmute::(self.msg_type) } - } - #[inline] - pub fn clear_to(&mut self) { - self.to = 0 - } - #[inline] - pub fn clear_from(&mut self) { - self.from = 0 - } - #[inline] - pub fn clear_term(&mut self) { - self.term = 0 - } - #[inline] - pub fn clear_log_term(&mut self) { - self.log_term = 0 - } - #[inline] - pub fn clear_index(&mut self) { - self.index = 0 - } - #[inline] - pub fn clear_entries(&mut self) { - self.entries.clear(); - } - #[inline] - pub fn mut_entries(&mut self) -> &mut ::std::vec::Vec { - &mut self.entries - } - #[inline] - pub fn take_entries(&mut self) -> ::std::vec::Vec { - ::std::mem::replace(&mut self.entries, ::std::vec::Vec::new()) - } - #[inline] - pub fn clear_commit(&mut self) { - self.commit = 0 - } - #[inline] - pub fn has_snapshot(&self) -> bool { - self.snapshot.is_some() - } - #[inline] - pub fn clear_snapshot(&mut self) { - self.snapshot = ::std::option::Option::None - } - #[inline] - pub fn set_snapshot(&mut self, v: Snapshot) { - self.snapshot = ::std::option::Option::Some(v); - } - #[inline] - pub fn get_snapshot(&self) -> &Snapshot { - match self.snapshot.as_ref() { - Some(v) => v, - None => Snapshot::default_ref(), - } - } - #[inline] - pub fn mut_snapshot(&mut self) -> &mut Snapshot { - if self.snapshot.is_none() { - self.snapshot = ::std::option::Option::Some(Snapshot::default()); - } - self.snapshot.as_mut().unwrap() - } - #[inline] - pub fn take_snapshot(&mut self) -> Snapshot { - self.snapshot.take().unwrap_or_else(Snapshot::default) - } - #[inline] - pub fn clear_reject(&mut self) { - self.reject = false - } - #[inline] - pub fn clear_reject_hint(&mut self) { - self.reject_hint = 0 - } - #[inline] - pub fn clear_context(&mut self) { - self.context.clear(); - } - #[inline] - pub fn mut_context(&mut self) -> &mut std::vec::Vec { - &mut self.context - } - #[inline] - pub fn take_context(&mut self) -> std::vec::Vec { - ::std::mem::replace(&mut self.context, ::std::vec::Vec::new()) - } -} -impl ::protobuf::Clear for Message { - fn clear(&mut self) { - ::prost::Message::clear(self); - } -} -impl ::protobuf::Message for Message { - fn compute_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn get_cached_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn as_any(&self) -> &dyn::std::any::Any { - self as &dyn::std::any::Any - } - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - fn new() -> Self { - Self::default() - } - fn default_instance() -> &'static Message { - ::lazy_static::lazy_static! { - static ref INSTANCE: Message = Message::default(); - } - &*INSTANCE - } - fn is_initialized(&self) -> bool { - true - } - fn write_to_with_cached_sizes( - &self, - _os: &mut ::protobuf::CodedOutputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn merge_from( - &mut self, - _is: &mut ::protobuf::CodedInputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - unimplemented!(); - } - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - unimplemented!(); - } - fn write_to_bytes(&self) -> ::protobuf::ProtobufResult> { - let mut buf = Vec::new(); - if ::prost::Message::encode(self, &mut buf).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(buf) - } - fn merge_from_bytes(&mut self, bytes: &[u8]) -> ::protobuf::ProtobufResult<()> { - if ::prost::Message::merge(self, bytes).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(()) - } -} -impl HardState { - #[inline] - pub fn default_ref() -> &'static Self { - ::protobuf::Message::default_instance() - } - #[inline] - pub fn clear_term(&mut self) { - self.term = 0 - } - #[inline] - pub fn clear_vote(&mut self) { - self.vote = 0 - } - #[inline] - pub fn clear_commit(&mut self) { - self.commit = 0 - } -} -impl ::protobuf::Clear for HardState { - fn clear(&mut self) { - ::prost::Message::clear(self); - } -} -impl ::protobuf::Message for HardState { - fn compute_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn get_cached_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn as_any(&self) -> &dyn::std::any::Any { - self as &dyn::std::any::Any - } - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - fn new() -> Self { - Self::default() - } - fn default_instance() -> &'static HardState { - ::lazy_static::lazy_static! { - static ref INSTANCE: HardState = HardState::default(); - } - &*INSTANCE - } - fn is_initialized(&self) -> bool { - true - } - fn write_to_with_cached_sizes( - &self, - _os: &mut ::protobuf::CodedOutputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn merge_from( - &mut self, - _is: &mut ::protobuf::CodedInputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - unimplemented!(); - } - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - unimplemented!(); - } - fn write_to_bytes(&self) -> ::protobuf::ProtobufResult> { - let mut buf = Vec::new(); - if ::prost::Message::encode(self, &mut buf).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(buf) - } - fn merge_from_bytes(&mut self, bytes: &[u8]) -> ::protobuf::ProtobufResult<()> { - if ::prost::Message::merge(self, bytes).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(()) - } -} -impl ConfState { - #[inline] - pub fn default_ref() -> &'static Self { - ::protobuf::Message::default_instance() - } - #[inline] - pub fn clear_nodes(&mut self) { - self.nodes.clear(); - } - #[inline] - pub fn mut_nodes(&mut self) -> &mut ::std::vec::Vec { - &mut self.nodes - } - #[inline] - pub fn take_nodes(&mut self) -> ::std::vec::Vec { - ::std::mem::replace(&mut self.nodes, ::std::vec::Vec::new()) - } - #[inline] - pub fn clear_learners(&mut self) { - self.learners.clear(); - } - #[inline] - pub fn mut_learners(&mut self) -> &mut ::std::vec::Vec { - &mut self.learners - } - #[inline] - pub fn take_learners(&mut self) -> ::std::vec::Vec { - ::std::mem::replace(&mut self.learners, ::std::vec::Vec::new()) - } -} -impl ::protobuf::Clear for ConfState { - fn clear(&mut self) { - ::prost::Message::clear(self); - } -} -impl ::protobuf::Message for ConfState { - fn compute_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn get_cached_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn as_any(&self) -> &dyn::std::any::Any { - self as &dyn::std::any::Any - } - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - fn new() -> Self { - Self::default() - } - fn default_instance() -> &'static ConfState { - ::lazy_static::lazy_static! { - static ref INSTANCE: ConfState = ConfState::default(); - } - &*INSTANCE - } - fn is_initialized(&self) -> bool { - true - } - fn write_to_with_cached_sizes( - &self, - _os: &mut ::protobuf::CodedOutputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn merge_from( - &mut self, - _is: &mut ::protobuf::CodedInputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - unimplemented!(); - } - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - unimplemented!(); - } - fn write_to_bytes(&self) -> ::protobuf::ProtobufResult> { - let mut buf = Vec::new(); - if ::prost::Message::encode(self, &mut buf).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(buf) - } - fn merge_from_bytes(&mut self, bytes: &[u8]) -> ::protobuf::ProtobufResult<()> { - if ::prost::Message::merge(self, bytes).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(()) - } -} -impl ConfChange { - #[inline] - pub fn default_ref() -> &'static Self { - ::protobuf::Message::default_instance() - } - #[inline] - pub fn clear_id(&mut self) { - self.id = 0 - } - #[inline] - pub fn clear_change_type(&mut self) { - self.change_type = 0 - } - #[inline] - pub fn set_change_type_(&mut self, v: ConfChangeType) { - self.change_type = unsafe { ::std::mem::transmute::(v) }; - } - #[inline] - pub fn get_change_type(&self) -> ConfChangeType { - unsafe { ::std::mem::transmute::(self.change_type) } - } - #[inline] - pub fn clear_node_id(&mut self) { - self.node_id = 0 - } - #[inline] - pub fn clear_context(&mut self) { - self.context.clear(); - } - #[inline] - pub fn mut_context(&mut self) -> &mut std::vec::Vec { - &mut self.context - } - #[inline] - pub fn take_context(&mut self) -> std::vec::Vec { - ::std::mem::replace(&mut self.context, ::std::vec::Vec::new()) - } - #[inline] - pub fn has_configuration(&self) -> bool { - self.configuration.is_some() - } - #[inline] - pub fn clear_configuration(&mut self) { - self.configuration = ::std::option::Option::None - } - #[inline] - pub fn set_configuration(&mut self, v: ConfState) { - self.configuration = ::std::option::Option::Some(v); - } - #[inline] - pub fn get_configuration(&self) -> &ConfState { - match self.configuration.as_ref() { - Some(v) => v, - None => ConfState::default_ref(), - } - } - #[inline] - pub fn mut_configuration(&mut self) -> &mut ConfState { - if self.configuration.is_none() { - self.configuration = ::std::option::Option::Some(ConfState::default()); - } - self.configuration.as_mut().unwrap() - } - #[inline] - pub fn take_configuration(&mut self) -> ConfState { - self.configuration.take().unwrap_or_else(ConfState::default) - } - #[inline] - pub fn clear_start_index(&mut self) { - self.start_index = 0 - } -} -impl ::protobuf::Clear for ConfChange { - fn clear(&mut self) { - ::prost::Message::clear(self); - } -} -impl ::protobuf::Message for ConfChange { - fn compute_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn get_cached_size(&self) -> u32 { - ::prost::Message::encoded_len(self) as u32 - } - fn as_any(&self) -> &dyn::std::any::Any { - self as &dyn::std::any::Any - } - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - fn new() -> Self { - Self::default() - } - fn default_instance() -> &'static ConfChange { - ::lazy_static::lazy_static! { - static ref INSTANCE: ConfChange = ConfChange::default(); - } - &*INSTANCE - } - fn is_initialized(&self) -> bool { - true - } - fn write_to_with_cached_sizes( - &self, - _os: &mut ::protobuf::CodedOutputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn merge_from( - &mut self, - _is: &mut ::protobuf::CodedInputStream, - ) -> ::protobuf::ProtobufResult<()> { - unimplemented!(); - } - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - unimplemented!(); - } - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - unimplemented!(); - } - fn write_to_bytes(&self) -> ::protobuf::ProtobufResult> { - let mut buf = Vec::new(); - if ::prost::Message::encode(self, &mut buf).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(buf) - } - fn merge_from_bytes(&mut self, bytes: &[u8]) -> ::protobuf::ProtobufResult<()> { - if ::prost::Message::merge(self, bytes).is_err() { - return Err(::protobuf::ProtobufError::WireError( - ::protobuf::error::WireError::Other, - )); - } - Ok(()) - } -} -impl EntryType { - pub fn values() -> &'static [Self] { - static VALUES: &'static [EntryType] = &[EntryType::EntryNormal, EntryType::EntryConfChange]; - VALUES - } -} -impl MessageType { - pub fn values() -> &'static [Self] { - static VALUES: &'static [MessageType] = &[ - MessageType::MsgHup, - MessageType::MsgBeat, - MessageType::MsgPropose, - MessageType::MsgAppend, - MessageType::MsgAppendResponse, - MessageType::MsgRequestVote, - MessageType::MsgRequestVoteResponse, - MessageType::MsgSnapshot, - MessageType::MsgHeartbeat, - MessageType::MsgHeartbeatResponse, - MessageType::MsgUnreachable, - MessageType::MsgSnapStatus, - MessageType::MsgCheckQuorum, - MessageType::MsgTransferLeader, - MessageType::MsgTimeoutNow, - MessageType::MsgReadIndex, - MessageType::MsgReadIndexResp, - MessageType::MsgRequestPreVote, - MessageType::MsgRequestPreVoteResponse, - ]; - VALUES - } -} -impl ConfChangeType { - pub fn values() -> &'static [Self] { - static VALUES: &'static [ConfChangeType] = &[ - ConfChangeType::AddNode, - ConfChangeType::RemoveNode, - ConfChangeType::AddLearnerNode, - ConfChangeType::BeginMembershipChange, - ConfChangeType::FinalizeMembershipChange, - ]; - VALUES - } -} diff --git a/src/errors.rs b/src/errors.rs index 84b8a9886..a9076d149 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -12,11 +12,10 @@ // limitations under the License. use crate::StateRole; +use protobuf::ProtobufError; use std::error; use std::{cmp, io, result}; -use prost::{DecodeError, EncodeError}; - quick_error! { /// The base error type for raft #[derive(Debug)] @@ -49,19 +48,12 @@ quick_error! { ConfigInvalid(desc: String) { description(desc) } - /// A Prost message encode failed in some manner. - ProstEncode(err: EncodeError) { - from() - cause(err) - description(err.description()) - display("prost encode error {:?}", err) - } - /// A Prost message decode failed in some manner. - ProstDecode(err: DecodeError) { + /// A protobuf message codec failed in some manner. + CodecError(err: ProtobufError) { from() cause(err) description(err.description()) - display("prost decode error {:?}", err) + display("protobuf codec error {:?}", err) } /// The node exists, but should not. Exists(id: u64, set: &'static str) { diff --git a/src/lib.rs b/src/lib.rs index 30ebd493b..8e0856e40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -269,7 +269,7 @@ need to update the applied index and resume `apply` later: continue; } - match entry.entry_type() { + match entry.get_entry_type() { EntryType::EntryNormal => handle_normal(entry), EntryType::EntryConfChange => handle_conf_change(entry), } @@ -317,7 +317,7 @@ This means it's possible to do: ```rust use raft::{Config, storage::MemStorage, raw_node::RawNode, eraftpb::*}; -use prost::Message as ProstMsg; +use protobuf::Message as PbMessage; let mut config = Config { id: 1, ..Default::default() }; let store = MemStorage::new_with_conf_state((vec![1, 2], vec![])); let mut node = RawNode::new(&mut config, store).unwrap(); @@ -336,7 +336,8 @@ node.raft.propose_membership_change(( # let entry = &node.raft.raft_log.entries(idx, 1).unwrap()[0]; // ...Later when the begin entry is recieved from a `ready()` in the `entries` field... -let conf_change = ConfChange::decode(&entry.data).unwrap(); +let mut conf_change = ConfChange::default(); +conf_change.merge_from_bytes(&entry.data).unwrap(); node.raft.begin_membership_change(&conf_change).unwrap(); assert!(node.raft.is_in_membership_change()); assert!(node.raft.prs().voter_ids().contains(&2)); @@ -350,7 +351,8 @@ assert!(node.raft.prs().voter_ids().contains(&3)); # let idx = node.raft.raft_log.last_index(); # let entry = &node.raft.raft_log.entries(idx, 1).unwrap()[0]; // ...Later, when the finalize entry is recieved from a `ready()` in the `entries` field... -let conf_change = ConfChange::decode(&entry.data).unwrap(); +let mut conf_change = ConfChange::default(); +conf_change.merge_from_bytes(&entry.data).unwrap(); node.raft.finalize_membership_change(&conf_change).unwrap(); assert!(!node.raft.prs().voter_ids().contains(&2)); assert!(node.raft.prs().voter_ids().contains(&3)); @@ -369,6 +371,8 @@ before taking old, removed peers offline. #![deny(clippy::all)] #![deny(missing_docs)] #![recursion_limit = "128"] +// This is necessary to support prost and rust-protobuf at the same time. +#![allow(clippy::identity_conversion)] #[cfg(feature = "failpoints")] #[macro_use] diff --git a/src/raft.rs b/src/raft.rs index dac0c7974..8dba86a78 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -31,7 +31,7 @@ use crate::eraftpb::{ ConfChange, ConfChangeType, Entry, EntryType, HardState, Message, MessageType, Snapshot, }; use hashbrown::{HashMap, HashSet}; -use prost::Message as ProstMsg; +use protobuf::Message as PbMessage; use rand::{self, Rng}; use slog::{self, Logger}; @@ -467,10 +467,10 @@ impl Raft { "msg" => ?m, ); m.from = self.id; - if m.msg_type() == MessageType::MsgRequestVote - || m.msg_type() == MessageType::MsgRequestPreVote - || m.msg_type() == MessageType::MsgRequestVoteResponse - || m.msg_type() == MessageType::MsgRequestPreVoteResponse + if m.get_msg_type() == MessageType::MsgRequestVote + || m.get_msg_type() == MessageType::MsgRequestPreVote + || m.get_msg_type() == MessageType::MsgRequestVoteResponse + || m.get_msg_type() == MessageType::MsgRequestPreVoteResponse { if m.term == 0 { // All {pre-,}campaign messages need to have the term set when @@ -488,7 +488,7 @@ impl Raft { panic!( "{} term should be set when sending {:?}", self.tag, - m.msg_type() + m.get_msg_type() ); } } else { @@ -496,7 +496,7 @@ impl Raft { panic!( "{} term should not be set when sending {:?} (was {})", self.tag, - m.msg_type(), + m.get_msg_type(), m.term ); } @@ -504,7 +504,8 @@ impl Raft { // proposals are a way to forward to the leader and // should be treated as local message. // MsgReadIndex is also forwarded to leader. - if m.msg_type() != MessageType::MsgPropose && m.msg_type() != MessageType::MsgReadIndex + if m.get_msg_type() != MessageType::MsgPropose + && m.get_msg_type() != MessageType::MsgReadIndex { m.term = self.term; } @@ -576,7 +577,7 @@ impl Raft { m.set_msg_type(MessageType::MsgAppend); m.index = pr.next_idx - 1; m.log_term = term; - m.entries = ents; + m.set_entries(ents.into()); m.commit = self.raft_log.committed; if !m.entries.is_empty() { let last = m.entries.last().unwrap().index; @@ -589,14 +590,14 @@ impl Raft { // will append the entries to the existing MsgAppend let mut is_batched = false; for msg in &mut self.msgs { - if msg.msg_type() == MessageType::MsgAppend && msg.to == to { + if msg.get_msg_type() == MessageType::MsgAppend && msg.to == to { if !ents.is_empty() { if !util::is_continuous_ents(msg, ents) { return is_batched; } - let mut batched_entries = msg.take_entries(); + let mut batched_entries: Vec<_> = msg.take_entries().into(); batched_entries.append(ents); - msg.entries = batched_entries; + msg.set_entries(batched_entries.into()); let last_idx = msg.entries.last().unwrap().index; pr.update_state(last_idx); } @@ -743,8 +744,7 @@ impl Raft { fn append_finalize_conf_change_entry(&mut self) { let mut conf_change = ConfChange::default(); conf_change.set_change_type(ConfChangeType::FinalizeMembershipChange); - let mut data = Vec::with_capacity(ProstMsg::encoded_len(&conf_change)); - conf_change.encode(&mut data).unwrap(); + let data = conf_change.write_to_bytes().unwrap(); let mut entry = Entry::default(); entry.set_entry_type(EntryType::EntryConfChange); entry.data = data; @@ -989,7 +989,7 @@ impl Raft { fn num_pending_conf(&self, ents: &[Entry]) -> usize { ents.iter() - .filter(|e| e.entry_type() == EntryType::EntryConfChange) + .filter(|e| e.get_entry_type() == EntryType::EntryConfChange) .count() } @@ -1067,8 +1067,8 @@ impl Raft { if m.term == 0 { // local message } else if m.term > self.term { - if m.msg_type() == MessageType::MsgRequestVote - || m.msg_type() == MessageType::MsgRequestPreVote + if m.get_msg_type() == MessageType::MsgRequestVote + || m.get_msg_type() == MessageType::MsgRequestPreVote { let force = m.context == CAMPAIGN_TRANSFER; let in_lease = self.check_quorum @@ -1095,15 +1095,15 @@ impl Raft { "term" => self.term, "remaining ticks" => self.election_timeout - self.election_elapsed, "tag" => &self.tag, - "msg type" => ?m.msg_type(), + "msg type" => ?m.get_msg_type(), ); return Ok(()); } } - if m.msg_type() == MessageType::MsgRequestPreVote - || (m.msg_type() == MessageType::MsgRequestPreVoteResponse && !m.reject) + if m.get_msg_type() == MessageType::MsgRequestPreVote + || (m.get_msg_type() == MessageType::MsgRequestPreVoteResponse && !m.reject) { // For a pre-vote request: // Never change our term in response to a pre-vote request. @@ -1122,11 +1122,11 @@ impl Raft { "tag" => &self.tag, "term" => self.term, "message_term" => m.term, - "msg type" => ?m.msg_type(), + "msg type" => ?m.get_msg_type(), ); - if m.msg_type() == MessageType::MsgAppend - || m.msg_type() == MessageType::MsgHeartbeat - || m.msg_type() == MessageType::MsgSnapshot + if m.get_msg_type() == MessageType::MsgAppend + || m.get_msg_type() == MessageType::MsgHeartbeat + || m.get_msg_type() == MessageType::MsgSnapshot { self.become_follower(m.term, m.from); } else { @@ -1135,8 +1135,8 @@ impl Raft { } } else if m.term < self.term { if (self.check_quorum || self.pre_vote) - && (m.msg_type() == MessageType::MsgHeartbeat - || m.msg_type() == MessageType::MsgAppend) + && (m.get_msg_type() == MessageType::MsgHeartbeat + || m.get_msg_type() == MessageType::MsgAppend) { // We have received messages from a leader at a lower term. It is possible // that these messages were simply delayed in the network, but this could @@ -1161,7 +1161,7 @@ impl Raft { // fresh election. This can be prevented with Pre-Vote phase. let to_send = new_message(m.from, MessageType::MsgAppendResponse, None); self.send(to_send); - } else if m.msg_type() == MessageType::MsgRequestPreVote { + } else if m.get_msg_type() == MessageType::MsgRequestPreVote { // Before pre_vote enable, there may be a recieving candidate with higher term, // but less log. After update to pre_vote, the cluster may deadlock if // we drop messages with a lower term. @@ -1172,7 +1172,7 @@ impl Raft { self.raft_log.last_term(), self.raft_log.last_index(), self.vote, - m.msg_type(), + m.get_msg_type(), m.from, m.log_term, m.index, @@ -1191,7 +1191,7 @@ impl Raft { from = m.from; "tag" => &self.tag, "term" => self.term, - "msg type" => ?m.msg_type(), + "msg type" => ?m.get_msg_type(), "msg term" => m.term ); } @@ -1201,7 +1201,7 @@ impl Raft { #[cfg(feature = "failpoints")] fail_point!("before_step"); - match m.msg_type() { + match m.get_msg_type() { MessageType::MsgHup => self.hup(false), MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => { debug_assert!(m.log_term != 0, "{:?} log term can't be 0", m); @@ -1211,7 +1211,7 @@ impl Raft { // ...we haven't voted and we don't think there's a leader yet in this term... (self.vote == INVALID_ID && self.leader_id == INVALID_ID) || // ...or this is a PreVote for a future term... - (m.msg_type == MessageType::MsgRequestPreVote as i32 && m.term > self.term); + (m.get_msg_type() == MessageType::MsgRequestPreVote && m.term > self.term); // ...and we believe the candidate is up to date. if can_vote && self.raft_log.is_up_to_date(m.index, m.log_term) { // When responding to Msg{Pre,}Vote messages we include the term @@ -1224,18 +1224,20 @@ impl Raft { // The term in the original message and current local term are the // same in the case of regular votes, but different for pre-votes. self.log_vote_approve(&m); - let mut to_send = new_message(m.from, vote_resp_msg_type(m.msg_type()), None); + let mut to_send = + new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); to_send.reject = false; to_send.term = m.term; self.send(to_send); - if m.msg_type() == MessageType::MsgRequestVote { + if m.get_msg_type() == MessageType::MsgRequestVote { // Only record real votes. self.election_elapsed = 0; self.vote = m.from; } } else { self.log_vote_reject(&m); - let mut to_send = new_message(m.from, vote_resp_msg_type(m.msg_type()), None); + let mut to_send = + new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); to_send.reject = true; to_send.term = self.term; self.send(to_send); @@ -1326,7 +1328,7 @@ impl Raft { /// corresponding entry. #[inline(always)] pub fn begin_membership_change(&mut self, conf_change: &ConfChange) -> Result<()> { - if conf_change.change_type() != ConfChangeType::BeginMembershipChange { + if conf_change.get_change_type() != ConfChangeType::BeginMembershipChange { return Err(Error::ViolatesContract(format!( "{:?} != BeginMembershipChange", conf_change.change_type @@ -1369,7 +1371,7 @@ impl Raft { /// * `ConfChange.start_index` value should not exist. #[inline(always)] pub fn finalize_membership_change(&mut self, conf_change: &ConfChange) -> Result<()> { - if conf_change.change_type() != ConfChangeType::FinalizeMembershipChange { + if conf_change.get_change_type() != ConfChangeType::FinalizeMembershipChange { return Err(Error::ViolatesContract(format!( "{:?} != BeginMembershipChange", conf_change.change_type @@ -1419,7 +1421,7 @@ impl Raft { msg_index = m.index, term = self.term; "tag" => &self.tag, - "msg type" => ?m.msg_type(), + "msg type" => ?m.get_msg_type(), ); } @@ -1436,7 +1438,7 @@ impl Raft { msg_index = m.index, term = self.term; "tag" => &self.tag, - "msg type" => ?m.msg_type(), + "msg type" => ?m.get_msg_type(), ); } @@ -1561,7 +1563,7 @@ impl Raft { to_send.set_msg_type(MessageType::MsgReadIndexResp); to_send.to = req.from; to_send.index = rs.index; - to_send.entries = req.take_entries(); + to_send.set_entries(req.take_entries()); more_to_send.push(to_send); } } @@ -1681,7 +1683,7 @@ impl Raft { } let mut prs = self.take_prs(); - match m.msg_type() { + match m.get_msg_type() { MessageType::MsgAppendResponse => { self.handle_append_response(m, &mut prs, old_paused, send_append, maybe_commit); } @@ -1719,7 +1721,7 @@ impl Raft { fn step_leader(&mut self, mut m: Message) -> Result<()> { // These message types do not require any progress for m.From. - match m.msg_type() { + match m.get_msg_type() { MessageType::MsgBeat => { self.bcast_heartbeat(); return Ok(()); @@ -1759,7 +1761,7 @@ impl Raft { } for (i, e) in m.mut_entries().iter_mut().enumerate() { - if e.entry_type() == EntryType::EntryConfChange { + if e.get_entry_type() == EntryType::EntryConfChange { if self.has_pending_conf() { info!( self.logger, @@ -1813,7 +1815,7 @@ impl Raft { to_send.set_msg_type(MessageType::MsgReadIndexResp); to_send.to = m.from; to_send.index = read_index; - to_send.entries = m.take_entries(); + to_send.set_entries(m.take_entries()); self.send(to_send); } } @@ -1833,7 +1835,7 @@ impl Raft { to_send.set_msg_type(MessageType::MsgReadIndexResp); to_send.to = m.from; to_send.index = self.raft_log.committed; - to_send.entries = m.take_entries(); + to_send.set_entries(m.take_entries()); self.send(to_send); } } @@ -1883,7 +1885,7 @@ impl Raft { // step_candidate is shared by state Candidate and PreCandidate; the difference is // whether they respond to MsgRequestVote or MsgRequestPreVote. fn step_candidate(&mut self, m: Message) -> Result<()> { - match m.msg_type() { + match m.get_msg_type() { MessageType::MsgPropose => { info!( self.logger, @@ -1913,9 +1915,9 @@ impl Raft { // state Candidate, we may get stale MsgPreVoteResp messages in this term from // our pre-candidate state). if (self.state == StateRole::PreCandidate - && m.msg_type() != MessageType::MsgRequestPreVoteResponse) + && m.get_msg_type() != MessageType::MsgRequestPreVoteResponse) || (self.state == StateRole::Candidate - && m.msg_type() != MessageType::MsgRequestVoteResponse) + && m.get_msg_type() != MessageType::MsgRequestVoteResponse) { return Ok(()); } @@ -1928,7 +1930,7 @@ impl Raft { if !acceptance { " rejection" } else { "" }, from = from_id; "tag" => &self.id, - "msg type" => ?m.msg_type(), + "msg type" => ?m.get_msg_type(), "term" => self.term, ); self.register_vote(from_id, acceptance); @@ -1964,7 +1966,7 @@ impl Raft { } fn step_follower(&mut self, mut m: Message) -> Result<()> { - match m.msg_type() { + match m.get_msg_type() { MessageType::MsgPropose => { if self.leader_id == INVALID_ID { info!( @@ -2306,8 +2308,7 @@ impl Raft { conf_change.set_change_type(ConfChangeType::BeginMembershipChange); conf_change.set_configuration(config.into()); conf_change.start_index = destination_index; - let mut data = Vec::with_capacity(ProstMsg::encoded_len(&conf_change)); - conf_change.encode(&mut data).unwrap(); + let data = conf_change.write_to_bytes()?; let mut entry = Entry::default(); entry.set_entry_type(EntryType::EntryConfChange); entry.data = data; @@ -2315,7 +2316,7 @@ impl Raft { message.set_msg_type(MessageType::MsgPropose); message.from = self.id; message.index = destination_index; - message.entries = vec![entry]; + message.set_entries(vec![entry].into()); // `append_entry` sets term, index for us. self.step(message)?; Ok(()) diff --git a/src/raft_log.rs b/src/raft_log.rs index 1768af4df..a0b1427fc 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -540,7 +540,7 @@ mod test { use crate::raft_log::{self, RaftLog}; use crate::storage::MemStorage; use harness::testing_logger; - use prost::Message as ProstMsg; + use protobuf::Message as PbMessage; use slog::Logger; fn new_raft_log(s: MemStorage, l: &Logger) -> RaftLog { @@ -1006,7 +1006,7 @@ mod test { let (last, half) = (offset + num, offset + num / 2); let halfe = new_entry(half, half); - let halfe_size = ProstMsg::encoded_len(&halfe) as u64; + let halfe_size = halfe.compute_size() as u64; let store = MemStorage::new(); store diff --git a/src/raw_node.rs b/src/raw_node.rs index e2f7a4c4f..d955b42ef 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -32,7 +32,7 @@ use std::mem; -use prost::Message as ProstMsg; +use protobuf::Message as PbMessage; use crate::config::Config; use crate::default_logger; @@ -295,7 +295,7 @@ impl RawNode { let mut e = Entry::default(); e.data = data; e.context = context; - m.entries = vec![e]; + m.set_entries(vec![e].into()); self.raft.step(m) } @@ -309,15 +309,14 @@ impl RawNode { /// ProposeConfChange proposes a config change. #[cfg_attr(feature = "cargo-clippy", allow(clippy::needless_pass_by_value))] pub fn propose_conf_change(&mut self, context: Vec, cc: ConfChange) -> Result<()> { - let mut data = Vec::with_capacity(ProstMsg::encoded_len(&cc)); - cc.encode(&mut data)?; + let data = cc.write_to_bytes()?; let mut m = Message::default(); m.set_msg_type(MessageType::MsgPropose); let mut e = Entry::default(); e.set_entry_type(EntryType::EntryConfChange); e.data = data; e.context = context; - m.entries = vec![e]; + m.set_entries(vec![e].into()); self.raft.step(m) } @@ -330,14 +329,15 @@ impl RawNode { /// For a safe interface for these directly call `this.raft.begin_membership_change(entry)` or /// `this.raft.finalize_membership_change(entry)` respectively. pub fn apply_conf_change(&mut self, cc: &ConfChange) -> Result { - if cc.node_id == INVALID_ID && cc.change_type() != ConfChangeType::BeginMembershipChange { + if cc.node_id == INVALID_ID && cc.get_change_type() != ConfChangeType::BeginMembershipChange + { let mut cs = ConfState::default(); cs.nodes = self.raft.prs().voter_ids().iter().cloned().collect(); cs.learners = self.raft.prs().learner_ids().iter().cloned().collect(); return Ok(cs); } let nid = cc.node_id; - match cc.change_type() { + match cc.get_change_type() { ConfChangeType::AddNode => self.raft.add_node(nid)?, ConfChangeType::AddLearnerNode => self.raft.add_learner(nid)?, ConfChangeType::RemoveNode => self.raft.remove_node(nid)?, @@ -353,10 +353,10 @@ impl RawNode { /// Step advances the state machine using the given message. pub fn step(&mut self, m: Message) -> Result<()> { // ignore unexpected local messages receiving over network - if is_local_msg(m.msg_type()) { + if is_local_msg(m.get_msg_type()) { return Err(Error::StepLocalMsg); } - if self.raft.prs().get(m.from).is_some() || !is_response_msg(m.msg_type()) { + if self.raft.prs().get(m.from).is_some() || !is_response_msg(m.get_msg_type()) { return self.raft.step(m); } Err(Error::StepPeerNotFound) @@ -493,7 +493,7 @@ impl RawNode { m.set_msg_type(MessageType::MsgReadIndex); let mut e = Entry::default(); e.data = rctx; - m.entries = vec![e]; + m.set_entries(vec![e].into()); let _ = self.raft.step(m); } diff --git a/src/storage.rs b/src/storage.rs index a46166160..afd5f2d1c 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -483,7 +483,7 @@ impl Storage for MemStorage { mod test { use std::panic::{self, AssertUnwindSafe}; - use prost::Message as ProstMsg; + use protobuf::Message as PbMessage; use crate::eraftpb::{ConfState, Entry, Snapshot}; use crate::errors::{Error as RaftError, StorageError}; @@ -498,8 +498,8 @@ mod test { e } - fn size_of(m: &T) -> u32 { - ProstMsg::encoded_len(m) as u32 + fn size_of(m: &T) -> u32 { + m.compute_size() as u32 } fn new_snapshot(index: u64, term: u64, nodes: Vec) -> Snapshot { diff --git a/src/util.rs b/src/util.rs index 0aee88b12..678336515 100644 --- a/src/util.rs +++ b/src/util.rs @@ -17,7 +17,7 @@ use std::u64; use crate::eraftpb::{Entry, Message}; -use prost::Message as ProstMsg; +use protobuf::Message as PbMessage; /// A number to represent that there is no limit. pub const NO_LIMIT: u64 = u64::MAX; @@ -53,7 +53,7 @@ pub const NO_LIMIT: u64 = u64::MAX; /// limit_size(&mut entries, Some(0)); /// assert_eq!(entries.len(), 1); /// ``` -pub fn limit_size(entries: &mut Vec, max: Option) { +pub fn limit_size(entries: &mut Vec, max: Option) { if entries.len() <= 1 { return; } @@ -67,10 +67,10 @@ pub fn limit_size(entries: &mut Vec, max: Option) { .iter() .take_while(|&e| { if size == 0 { - size += ProstMsg::encoded_len(e) as u64; + size += u64::from(e.compute_size()); true } else { - size += ProstMsg::encoded_len(e) as u64; + size += u64::from(e.compute_size()); size <= max } }) diff --git a/tests/integration_cases/test_membership_changes.rs b/tests/integration_cases/test_membership_changes.rs index 5e6762509..cb1f1ff97 100644 --- a/tests/integration_cases/test_membership_changes.rs +++ b/tests/integration_cases/test_membership_changes.rs @@ -18,7 +18,7 @@ use harness::testing_logger; use harness::Network; use hashbrown::{HashMap, HashSet}; -use prost::Message as ProstMsg; +use protobuf::Message as PbMessage; use raft::{ eraftpb::{ ConfChange, ConfChangeType, ConfState, Entry, EntryType, Message, MessageType, Snapshot, @@ -1179,7 +1179,7 @@ mod intermingled_config_changes { .raft_log .entries(5, 1) .unwrap()[0] - .entry_type(), + .get_entry_type(), EntryType::EntryNormal ); @@ -1485,9 +1485,10 @@ impl Scenario { peer.mut_store().wl().append(&entries).unwrap(); let mut found = false; for entry in &entries { - if entry.entry_type() == EntryType::EntryConfChange { - let conf_change = ConfChange::decode(&entry.data)?; - if conf_change.change_type() == entry_type { + if entry.get_entry_type() == EntryType::EntryConfChange { + let mut conf_change = ConfChange::default(); + conf_change.merge_from_bytes(&entry.data)?; + if conf_change.get_change_type() == entry_type { found = true; match entry_type { ConfChangeType::BeginMembershipChange => { @@ -1604,9 +1605,10 @@ impl Scenario { .raft_log .slice(index, index + 1, None) .unwrap()[0]; - assert_eq!(entry.entry_type(), EntryType::EntryConfChange); - let conf_change = ConfChange::decode(&entry.data).unwrap(); - assert_eq!(conf_change.change_type(), entry_type); + assert_eq!(entry.get_entry_type(), EntryType::EntryConfChange); + let mut conf_change = ConfChange::default(); + conf_change.merge_from_bytes(&entry.data).unwrap(); + assert_eq!(conf_change.get_change_type(), entry_type); } } @@ -1660,8 +1662,7 @@ fn begin_entry<'a>( index: u64, ) -> Entry { let conf_change = begin_conf_change(voters, learners, index); - let mut data = Vec::with_capacity(ProstMsg::encoded_len(&conf_change)); - conf_change.encode(&mut data).unwrap(); + let data = conf_change.write_to_bytes().unwrap(); let mut entry = Entry::default(); entry.set_entry_type(EntryType::EntryConfChange); entry.data = data; @@ -1680,7 +1681,7 @@ fn build_propose_change_message<'a>( message.to = recipient; message.set_msg_type(MessageType::MsgPropose); message.index = index; - message.entries = vec![begin_entry]; + message.entries = vec![begin_entry].into(); message } @@ -1689,8 +1690,7 @@ fn build_propose_add_node_message(recipient: u64, added_id: u64, index: u64) -> let mut conf_change = ConfChange::default(); conf_change.set_change_type(ConfChangeType::AddNode); conf_change.node_id = added_id; - let mut data = Vec::with_capacity(ProstMsg::encoded_len(&conf_change)); - conf_change.encode(&mut data).unwrap(); + let data = conf_change.write_to_bytes().unwrap(); let mut entry = Entry::default(); entry.set_entry_type(EntryType::EntryConfChange); entry.data = data; @@ -1701,6 +1701,6 @@ fn build_propose_add_node_message(recipient: u64, added_id: u64, index: u64) -> message.to = recipient; message.set_msg_type(MessageType::MsgPropose); message.index = index; - message.entries = vec![add_nodes_entry]; + message.entries = vec![add_nodes_entry].into(); message } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index e8deea943..f842bb197 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -31,7 +31,7 @@ use std::panic::{self, AssertUnwindSafe}; use harness::*; use hashbrown::HashSet; -use prost::Message as ProstMsg; +use protobuf::Message as PbMessage; use raft::eraftpb::*; use raft::storage::MemStorage; @@ -352,7 +352,7 @@ fn test_progress_paused() { m.set_msg_type(MessageType::MsgPropose); let mut e = Entry::default(); e.data = b"some_data".to_vec(); - m.entries = vec![e]; + m.entries = vec![e].into(); raft.step(m.clone()).expect(""); raft.step(m.clone()).expect(""); raft.step(m.clone()).expect(""); @@ -633,12 +633,12 @@ fn test_vote_from_any_state_for_type(vt: MessageType, l: &Logger) { ); let resp = &r.msgs[0]; assert_eq!( - resp.msg_type(), + resp.get_msg_type(), vote_resp_msg_type(vt), "{:?},{:?}: response message is {:?}, want {:?}", vt, state, - resp.msg_type(), + resp.get_msg_type(), vote_resp_msg_type(vt) ); assert!(!resp.reject, "{:?},{:?}: unexpected rejection", vt, state); @@ -724,7 +724,7 @@ fn test_log_replicatioin() { let ents: Vec = ents.drain(..).filter(|e| !e.data.is_empty()).collect(); for (k, m) in msgs .iter() - .filter(|m| m.msg_type() == MessageType::MsgPropose) + .filter(|m| m.get_msg_type() == MessageType::MsgPropose) .enumerate() { if ents[k].data != m.entries[0].data { @@ -943,7 +943,7 @@ fn test_candidate_concede() { // send a proposal to 3 to flush out a MsgAppend to 1 let data = "force follower"; let mut m = new_message(3, 3, MessageType::MsgPropose, 0); - m.entries = vec![new_entry(0, 0, Some(data))]; + m.entries = vec![new_entry(0, 0, Some(data))].into(); tt.send(vec![m]); // send heartbeat; flush out commit tt.send(vec![new_message(3, 3, MessageType::MsgBeat, 0)]); @@ -989,7 +989,7 @@ fn test_old_messages() { // pretend we're an old leader trying to make progress; this entry is expected to be ignored. let mut m = new_message(2, 1, MessageType::MsgAppend, 0); m.term = 2; - m.entries = vec![empty_entry(2, 3)]; + m.entries = vec![empty_entry(2, 3)].into(); tt.send(vec![m]); // commit a new entry tt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); @@ -1268,11 +1268,11 @@ fn test_handle_heartbeat() { if m.len() != 1 { panic!("#{}: msg count = {}, want 1", i, m.len()); } - if m[0].msg_type() != MessageType::MsgHeartbeatResponse { + if m[0].get_msg_type() != MessageType::MsgHeartbeatResponse { panic!( "#{}: type = {:?}, want MsgHeartbeatResponse", i, - m[0].msg_type() + m[0].get_msg_type() ); } } @@ -1298,14 +1298,14 @@ fn test_handle_heartbeat_resp() { .expect(""); let mut msgs = sm.read_messages(); assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].msg_type(), MessageType::MsgAppend); + assert_eq!(msgs[0].get_msg_type(), MessageType::MsgAppend); // A second heartbeat response generates another MsgApp re-send sm.step(new_message(2, 0, MessageType::MsgHeartbeatResponse, 0)) .expect(""); msgs = sm.read_messages(); assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].msg_type(), MessageType::MsgAppend); + assert_eq!(msgs[0].get_msg_type(), MessageType::MsgAppend); // Once we have an MsgAppResp, heartbeats no longer send MsgApp. let mut m = new_message(2, 0, MessageType::MsgAppendResponse, 0); @@ -1346,7 +1346,7 @@ fn test_raft_frees_read_only_mem() { sm.step(m).expect(""); let msgs = sm.read_messages(); assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].msg_type(), MessageType::MsgHeartbeat); + assert_eq!(msgs[0].get_msg_type(), MessageType::MsgHeartbeat); assert_eq!(msgs[0].context, &vec_ctx[..]); assert_eq!(sm.read_only.read_index_queue.len(), 1); assert_eq!(sm.read_only.pending_read_index.len(), 1); @@ -1387,14 +1387,14 @@ fn test_msg_append_response_wait_reset() { // A new command is now proposed on node 1. m = new_message(1, 0, MessageType::MsgPropose, 0); - m.entries = vec![empty_entry(0, 0)]; + m.entries = vec![empty_entry(0, 0)].into(); sm.step(m).expect(""); // The command is broadcast to all nodes not in the wait state. // Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting. let mut msgs = sm.read_messages(); assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].msg_type(), MessageType::MsgAppend); + assert_eq!(msgs[0].get_msg_type(), MessageType::MsgAppend); assert_eq!(msgs[0].to, 2); assert_eq!(msgs[0].entries.len(), 1); assert_eq!(msgs[0].entries[0].index, 3); @@ -1405,7 +1405,7 @@ fn test_msg_append_response_wait_reset() { sm.step(m).expect(""); msgs = sm.read_messages(); assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].msg_type(), MessageType::MsgAppend); + assert_eq!(msgs[0].get_msg_type(), MessageType::MsgAppend); assert_eq!(msgs[0].to, 3); assert_eq!(msgs[0].entries.len(), 2); assert_eq!(msgs[0].entries[0].index, 2); @@ -1466,11 +1466,11 @@ fn test_recv_msg_request_vote_for_type(msg_type: MessageType, l: &Logger) { if msgs.len() != 1 { panic!("#{}: msgs count = {}, want 1", j, msgs.len()); } - if msgs[0].msg_type() != vote_resp_msg_type(msg_type) { + if msgs[0].get_msg_type() != vote_resp_msg_type(msg_type) { panic!( "#{}: m.type = {:?}, want {:?}", j, - msgs[0].msg_type(), + msgs[0].get_msg_type(), vote_resp_msg_type(msg_type) ); } @@ -2565,11 +2565,11 @@ fn test_bcast_beat() { cmp::min(sm.raft_log.committed, sm.prs().get(3).unwrap().matched), ); for (i, m) in msgs.drain(..).enumerate() { - if m.msg_type() != MessageType::MsgHeartbeat { + if m.get_msg_type() != MessageType::MsgHeartbeat { panic!( "#{}: type = {:?}, want = {:?}", i, - m.msg_type(), + m.get_msg_type(), MessageType::MsgHeartbeat ); } @@ -2622,11 +2622,11 @@ fn test_recv_msg_beat() { panic!("#{}: msg count = {}, want {}", i, msgs.len(), w_msg); } for m in msgs { - if m.msg_type() != MessageType::MsgHeartbeat { + if m.get_msg_type() != MessageType::MsgHeartbeat { panic!( "#{}: msg.type = {:?}, want {:?}", i, - m.msg_type(), + m.get_msg_type(), MessageType::MsgHeartbeat ); } @@ -2711,7 +2711,7 @@ fn test_send_append_for_progress_probe() { // consume the heartbeat let msg = r.read_messages(); assert_eq!(msg.len(), 1); - assert_eq!(msg[0].msg_type(), MessageType::MsgHeartbeat); + assert_eq!(msg[0].get_msg_type(), MessageType::MsgHeartbeat); } // a heartbeat response will allow another message to be sent @@ -2848,7 +2848,7 @@ fn test_provide_snap() { let msgs = sm.read_messages(); assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].msg_type(), MessageType::MsgSnapshot); + assert_eq!(msgs[0].get_msg_type(), MessageType::MsgSnapshot); } #[test] @@ -3142,8 +3142,7 @@ fn test_commit_after_remove_node() -> Result<()> { let mut cc = ConfChange::default(); cc.set_change_type(ConfChangeType::RemoveNode); cc.node_id = 2; - let mut ccdata = Vec::with_capacity(ProstMsg::encoded_len(&cc)); - cc.encode(&mut ccdata).unwrap(); + let ccdata = cc.write_to_bytes().unwrap(); e.data = ccdata; m.mut_entries().push(e); r.step(m).expect(""); @@ -3164,16 +3163,16 @@ fn test_commit_after_remove_node() -> Result<()> { r.step(m).expect(""); let ents = next_ents(&mut r, &s); assert_eq!(ents.len(), 2); - assert_eq!(ents[0].entry_type(), EntryType::EntryNormal); + assert_eq!(ents[0].get_entry_type(), EntryType::EntryNormal); assert!(ents[0].data.is_empty()); - assert_eq!(ents[1].entry_type(), EntryType::EntryConfChange); + assert_eq!(ents[1].get_entry_type(), EntryType::EntryConfChange); // Apply the config change. This reduces quorum requirements so the // pending command can now commit. r.remove_node(2)?; let ents = next_ents(&mut r, &s); assert_eq!(ents.len(), 1); - assert_eq!(ents[0].entry_type(), EntryType::EntryNormal); + assert_eq!(ents[0].get_entry_type(), EntryType::EntryNormal); assert_eq!(ents[0].data, b"hello"); Ok(()) diff --git a/tests/integration_cases/test_raft_paper.rs b/tests/integration_cases/test_raft_paper.rs index e5b459858..baca37239 100644 --- a/tests/integration_cases/test_raft_paper.rs +++ b/tests/integration_cases/test_raft_paper.rs @@ -38,7 +38,7 @@ pub fn commit_noop_entry(r: &mut Interface, s: &MemStorage) { // simulate the response of MsgAppend let msgs = r.read_messages(); for m in msgs { - assert_eq!(m.msg_type(), MessageType::MsgAppend); + assert_eq!(m.get_msg_type(), MessageType::MsgAppend); assert_eq!(m.entries.len(), 1); assert!(m.entries[0].data.is_empty()); r.step(accept_and_reply(&m)).expect(""); @@ -55,7 +55,7 @@ pub fn commit_noop_entry(r: &mut Interface, s: &MemStorage) { } fn accept_and_reply(m: &Message) -> Message { - assert_eq!(m.msg_type(), MessageType::MsgAppend); + assert_eq!(m.get_msg_type(), MessageType::MsgAppend); let mut reply = new_message(m.to, m.from, MessageType::MsgAppendResponse, 0); reply.term = m.term; reply.index = m.index + m.entries.len() as u64; @@ -462,8 +462,8 @@ fn test_leader_start_replication() { m }; let expect_msgs = vec![ - new_message_ext(1, 2, wents.clone()), - new_message_ext(1, 3, wents.clone()), + new_message_ext(1, 2, wents.clone().into()), + new_message_ext(1, 3, wents.clone().into()), ]; assert_eq!(msgs, expect_msgs); assert_eq!(r.raft_log.unstable_entries(), Some(&*wents)); @@ -499,7 +499,7 @@ fn test_leader_commit_entry() { msgs.sort_by_key(|m| format!("{:?}", m)); for (i, m) in msgs.drain(..).enumerate() { assert_eq!(i as u64 + 2, m.to); - assert_eq!(m.msg_type(), MessageType::MsgAppend); + assert_eq!(m.get_msg_type(), MessageType::MsgAppend); assert_eq!(m.commit, li + 1); } } @@ -630,7 +630,7 @@ fn test_follower_commit_entry() { m.log_term = 1; m.index = 1; m.commit = commit; - m.entries = ents.clone(); + m.entries = ents.clone().into(); r.step(m).expect(""); if r.raft_log.committed != commit { @@ -758,7 +758,7 @@ fn test_follower_append_entries() { m.term = 2; m.log_term = term; m.index = index; - m.entries = ents; + m.entries = ents.into(); r.step(m).expect(""); let g = r.raft_log.all_entries(); @@ -883,7 +883,7 @@ fn test_leader_sync_follower_log() { n.send(vec![m]); let mut m = new_message(1, 1, MessageType::MsgPropose, 0); - m.entries = vec![Entry::default()]; + m.entries = vec![Entry::default()].into(); n.send(vec![m]); let lead_str = ltoa(&n.peers[&1].raft_log); let follower_str = ltoa(&n.peers[&2].raft_log); @@ -912,7 +912,7 @@ fn test_vote_request() { m.term = wterm - 1; m.log_term = 1; // log-term must be greater than 0. m.index = 1; - m.entries = ents.clone(); + m.entries = ents.clone().into(); r.step(m).expect(""); r.read_messages(); @@ -926,12 +926,12 @@ fn test_vote_request() { panic!("#{}: msg count = {}, want 2", j, msgs.len()); } for (i, m) in msgs.iter().enumerate() { - if m.msg_type() != MessageType::MsgRequestVote { + if m.get_msg_type() != MessageType::MsgRequestVote { panic!( "#{}.{}: msg_type = {:?}, want {:?}", j, i, - m.msg_type(), + m.get_msg_type(), MessageType::MsgRequestVote ); } @@ -989,11 +989,11 @@ fn test_voter() { if msgs.len() != 1 { panic!("#{}: msg count = {}, want {}", i, msgs.len(), 1); } - if msgs[0].msg_type() != MessageType::MsgRequestVoteResponse { + if msgs[0].get_msg_type() != MessageType::MsgRequestVoteResponse { panic!( "#{}: msg_type = {:?}, want {:?}", i, - msgs[0].msg_type(), + msgs[0].get_msg_type(), MessageType::MsgRequestVoteResponse ); } diff --git a/tests/integration_cases/test_raw_node.rs b/tests/integration_cases/test_raw_node.rs index a4841710c..f97a3b998 100644 --- a/tests/integration_cases/test_raw_node.rs +++ b/tests/integration_cases/test_raw_node.rs @@ -26,7 +26,7 @@ // limitations under the License. use harness::*; -use prost::Message as ProstMsg; +use protobuf::{Message as PbMessage, ProtobufEnum as _}; use raft::eraftpb::*; use raft::storage::MemStorage; use raft::*; @@ -81,8 +81,7 @@ fn new_raw_node( #[test] fn test_raw_node_step() { let l = testing_logger().new(o!("test" => "sending_snapshot_set_pending_snapshot")); - for msg_t in 0..18 { - let msg_t = MessageType::from_i32(msg_t).unwrap(); + for msg_t in MessageType::values() { if vec![ // Vote messages with term 0 will cause panics. MessageType::MsgRequestVote, @@ -97,7 +96,7 @@ fn test_raw_node_step() { } let mut raw_node = new_raw_node(1, vec![1], 10, 1, new_storage(), &l); - let res = raw_node.step(new_message(0, 0, msg_t, 0)); + let res = raw_node.step(new_message(0, 0, *msg_t, 0)); // local msg should be ignored. if vec![ MessageType::MsgBeat, @@ -193,8 +192,8 @@ fn test_raw_node_propose_and_conf_change() { raw_node.propose(vec![], b"somedata".to_vec()).expect(""); let cc = conf_change(ConfChangeType::AddNode, 2); - ccdata.reserve_exact(ProstMsg::encoded_len(&cc)); - cc.encode(&mut ccdata).unwrap(); + ccdata.reserve_exact(cc.compute_size() as usize); + cc.write_to_vec(&mut ccdata).unwrap(); raw_node.propose_conf_change(vec![], cc).expect(""); proposed = true; @@ -212,7 +211,7 @@ fn test_raw_node_propose_and_conf_change() { let entries = s.entries(last_index - 1, last_index + 1, None).unwrap(); assert_eq!(entries.len(), 2); assert_eq!(entries[0].data, b"somedata"); - assert_eq!(entries[1].entry_type(), EntryType::EntryConfChange); + assert_eq!(entries[1].get_entry_type(), EntryType::EntryConfChange); assert_eq!(entries[1].data, &*ccdata); } @@ -239,8 +238,9 @@ fn test_raw_node_propose_add_duplicate_node() { let rd = raw_node.ready(); s.wl().append(rd.entries()).expect(""); for e in rd.committed_entries.as_ref().unwrap() { - if e.entry_type() == EntryType::EntryConfChange { - let conf_change = ConfChange::decode(&e.data).unwrap(); + if e.get_entry_type() == EntryType::EntryConfChange { + let mut conf_change = ConfChange::default(); + conf_change.merge_from_bytes(&e.data).unwrap(); raw_node.apply_conf_change(&conf_change).ok(); } } @@ -248,8 +248,7 @@ fn test_raw_node_propose_add_duplicate_node() { }; let cc1 = conf_change(ConfChangeType::AddNode, 1); - let mut ccdata1 = Vec::with_capacity(ProstMsg::encoded_len(&cc1)); - cc1.encode(&mut ccdata1).unwrap(); + let ccdata1 = cc1.write_to_bytes().unwrap(); propose_conf_change_and_apply(cc1.clone()); // try to add the same node again @@ -257,8 +256,7 @@ fn test_raw_node_propose_add_duplicate_node() { // the new node join should be ok let cc2 = conf_change(ConfChangeType::AddNode, 2); - let mut ccdata2 = Vec::with_capacity(ProstMsg::encoded_len(&cc2)); - cc2.encode(&mut ccdata2).unwrap(); + let ccdata2 = cc2.write_to_bytes().unwrap(); propose_conf_change_and_apply(cc2); let last_index = s.last_index().unwrap(); @@ -303,7 +301,8 @@ fn test_raw_node_propose_add_learner_node() -> Result<()> { ); let e = &rd.committed_entries.as_ref().unwrap()[0]; - let conf_change = ConfChange::decode(&e.data).unwrap(); + let mut conf_change = ConfChange::default(); + conf_change.merge_from_bytes(&e.data).unwrap(); let conf_state = raw_node.apply_conf_change(&conf_change)?; assert_eq!(conf_state.nodes, vec![1]); assert_eq!(conf_state.learners, vec![2]); @@ -478,9 +477,7 @@ fn test_skip_bcast_commit() { let mut cc = ConfChange::default(); cc.set_change_type(ConfChangeType::RemoveNode); cc.node_id = 3; - let mut data = Vec::with_capacity(ProstMsg::encoded_len(&cc)); - data.reserve_exact(ProstMsg::encoded_len(&cc)); - cc.encode(&mut data).unwrap(); + let data = cc.write_to_bytes().unwrap(); let mut cc_entry = Entry::default(); cc_entry.set_entry_type(EntryType::EntryConfChange); cc_entry.data = data; diff --git a/tests/test_util/mod.rs b/tests/test_util/mod.rs index cba850d01..7e7a7533c 100644 --- a/tests/test_util/mod.rs +++ b/tests/test_util/mod.rs @@ -137,7 +137,7 @@ pub fn new_message_with_entries(from: u64, to: u64, t: MessageType, ents: Vec Message { for _ in 0..n { ents.push(new_entry(0, 0, SOME_DATA)); } - m.entries = ents; + m.entries = ents.into(); } m }