From d1c2aff44ea305b9f782b1bf82d9a1f687d2f9c4 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Wed, 12 Apr 2023 13:23:13 +0200 Subject: [PATCH 01/11] Add structure of operation handler --- Cargo.lock | 68 +++++------ .../operation_handler/internal_messages.rs | 7 ++ .../src/handlers/operation_handler/mod.rs | 109 ++++-------------- .../handlers/operation_handler/propagation.rs | 38 ++++++ .../handlers/operation_handler/retrieval.rs | 97 ++++++++++++++++ 5 files changed, 198 insertions(+), 121 deletions(-) create mode 100644 massa-protocol-worker-2/src/handlers/operation_handler/internal_messages.rs create mode 100644 massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs create mode 100644 massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs diff --git a/Cargo.lock b/Cargo.lock index 8e236bffe4a..0bd07b96433 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -204,7 +204,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -215,7 +215,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -237,9 +237,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.12" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349f8ccfd9221ee7d1f3d4b33e1f8319b3a81ed8f61f2ea40b37b859794b4491" +checksum = "6e5abc76e6ef57bf438c067d2898481475994102eeb5d662a0b2162d0c2fdcf1" dependencies = [ "async-trait", "axum-core", @@ -265,9 +265,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2f958c80c248b34b9a877a643811be8dbca03ca5ba827f2b63baf3a81e5fc4e" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" dependencies = [ "async-trait", "bytes", @@ -918,9 +918,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.7" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf2b3e8478797446514c91ef04bafcb59faba183e621ad488df88983cc14128c" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1036,7 +1036,7 @@ dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", "scratch", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -1053,7 +1053,7 @@ checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -1106,9 +1106,9 @@ dependencies = [ [[package]] name = "dialoguer" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af3c796f3b0b408d9fd581611b47fa850821fcb84aa640b83a3c1a5be2d691f2" +checksum = "59c6f2989294b9a498d3ad5491a79c6deb604617378e1cdc4bfc1c1361fe2f87" dependencies = [ "console", "shell-words", @@ -1365,13 +1365,13 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d6a0976c999d473fe89ad888d5a284e55366d9dc9038b1ba2aa15128c4afa0" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" dependencies = [ "errno-dragonfly", "libc", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -1525,7 +1525,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -2841,7 +2841,7 @@ dependencies = [ [[package]] name = "massa_hash" version = "0.1.0" -source = "git+https://github.com/massalabs/massa.git#b27e8416efac19e8ec470e96bb49c74687092ac3" +source = "git+https://github.com/massalabs/massa.git#2e351ab46db270634fe81137dbc78d1ea9e10228" dependencies = [ "blake3", "bs58", @@ -3177,7 +3177,7 @@ dependencies = [ [[package]] name = "massa_serialization" version = "0.1.0" -source = "git+https://github.com/massalabs/massa.git#b27e8416efac19e8ec470e96bb49c74687092ac3" +source = "git+https://github.com/massalabs/massa.git#2e351ab46db270634fe81137dbc78d1ea9e10228" dependencies = [ "displaydoc", "nom", @@ -3205,7 +3205,7 @@ dependencies = [ [[package]] name = "massa_signature" version = "0.1.0" -source = "git+https://github.com/massalabs/massa.git#b27e8416efac19e8ec470e96bb49c74687092ac3" +source = "git+https://github.com/massalabs/massa.git#2e351ab46db270634fe81137dbc78d1ea9e10228" dependencies = [ "bs58", "displaydoc", @@ -3775,7 +3775,7 @@ dependencies = [ "pest_meta", "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -4386,9 +4386,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.37.8" +version = "0.37.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aef160324be24d31a62147fae491c14d2204a3865c7ca8c3b0d7f7bcb3ea635" +checksum = "85597d61f83914ddeba6a47b3b8ffe7365107221c2e557ed94426489fefb5f77" dependencies = [ "bitflags", "errno", @@ -4565,9 +4565,9 @@ checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" [[package]] name = "serde" -version = "1.0.159" +version = "1.0.160" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c04e8343c3daeec41f58990b9d77068df31209f2af111e059e9fe9646693065" +checksum = "bb2f3770c8bce3bcda7e149193a069a0f4365bda1fa5cd88e03bca26afc1216c" dependencies = [ "serde_derive", ] @@ -4585,13 +4585,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.159" +version = "1.0.160" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c614d17805b093df4b147b51339e7e44bf05ef59fba1e45d83500bcfb4d8585" +checksum = "291a097c63d8497e00160b166a967a4a79c64f3facdd01cbd7502231688d77df" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -4968,9 +4968,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.13" +version = "2.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c9da457c5285ac1f936ebd076af6dac17a61cfe7826f2076b4d015cf47bc8ec" +checksum = "fcf316d5356ed6847742d036f8a39c3b8435cac10bd528a4bd461928a6ab34d5" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", @@ -5049,7 +5049,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -5158,7 +5158,7 @@ checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] @@ -5537,9 +5537,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" +checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" dependencies = [ "getrandom 0.2.9", ] @@ -6213,7 +6213,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.13", + "syn 2.0.14", ] [[package]] diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/internal_messages.rs b/massa-protocol-worker-2/src/handlers/operation_handler/internal_messages.rs new file mode 100644 index 00000000000..f5fc5d2c165 --- /dev/null +++ b/massa-protocol-worker-2/src/handlers/operation_handler/internal_messages.rs @@ -0,0 +1,7 @@ +use peernet::peer_id::PeerId; + +pub enum InternalMessage { + /// (From peer id (optional, can come from API or other modules)), endorsements) + /// TODO: Add data + PropagateOperations((Option, ())), +} diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs b/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs index 8349f3f232d..8c039f30b29 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs @@ -1,19 +1,20 @@ use std::thread::JoinHandle; -use crossbeam::{ - channel::{unbounded, Receiver, Sender}, - select, -}; -use massa_serialization::{DeserializeError, Deserializer}; +use crossbeam::channel::{unbounded, Receiver}; +use massa_pool_exports::PoolController; +use massa_storage::Storage; use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; use self::{ - commands::OperationHandlerCommand, - messages::{OperationMessageDeserializer, OperationMessageDeserializerArgs}, + commands::OperationHandlerCommand, propagation::start_propagation_thread, + retrieval::start_retrieval_thread, }; pub mod commands; +mod internal_messages; mod messages; +mod propagation; +mod retrieval; pub(crate) use messages::{OperationMessage, OperationMessageSerializer}; @@ -24,90 +25,24 @@ pub struct OperationHandler { impl OperationHandler { pub fn new( + pool_controller: Box, + storage: Storage, active_connections: SharedActiveConnections, receiver: Receiver<(PeerId, u64, Vec)>, receiver_ext: Receiver, ) -> Self { - //TODO: Define real data - let (_internal_sender, internal_receiver): (Sender<()>, Receiver<()>) = unbounded(); - let operation_retrieval_thread = std::thread::spawn(move || { - //TODO: Real values - let mut operation_message_deserializer = - OperationMessageDeserializer::new(OperationMessageDeserializerArgs { - max_datastore_value_length: 10000, - max_function_name_length: 10000, - max_op_datastore_entry_count: 10000, - max_op_datastore_key_length: 100, - max_op_datastore_value_length: 10000, - max_operations: 10000, - max_operations_prefix_ids: 10000, - max_parameters_size: 10000, - }); - //TODO: Real logic - loop { - select! { - recv(receiver) -> msg => { - match msg { - Ok((peer_id, message_id, message)) => { - operation_message_deserializer.set_message_id(message_id); - let (rest, message) = operation_message_deserializer - .deserialize::(&message) - .unwrap(); - if !rest.is_empty() { - println!("Error: message not fully consumed"); - return; - } - println!("Received message from {:?}: {:?}", peer_id, message); - } - Err(err) => { - println!("Error: {:?}", err); - return; - } - } - }, - recv(receiver_ext) -> command => { - match command { - Ok(command) => { - println!("Received command: {:?}", command); - } - Err(err) => { - println!("Error: {:?}", err); - return; - } - } - } - } - } - }); - - let operation_propagation_thread = std::thread::spawn({ - let _active_connections = active_connections.clone(); - move || { - let _operation_message_serializer = OperationMessageSerializer::new(); - //TODO: Real logic - loop { - match internal_receiver.recv() { - Ok(_data) => { - // Example to send data - // { - // let active_connections = active_connections.read(); - // for (peer_id, connection) in active_connections.iter() { - // println!("Sending message to {:?}", peer_id); - // let buf = Vec::new(); - // operation_message_serializer.serialize(&data, &mut buf).unwrap(); - // connection.send_message(&buf); - // } - // } - println!("Received message"); - } - Err(err) => { - println!("Error: {:?}", err); - return; - } - } - } - } - }); + //TODO: Define bound channel + let (internal_sender, internal_receiver) = unbounded(); + let operation_retrieval_thread = start_retrieval_thread( + receiver, + receiver_ext, + pool_controller, + storage, + internal_sender, + ); + + let operation_propagation_thread = + start_propagation_thread(internal_receiver, active_connections); Self { operation_retrieval_thread: Some(operation_retrieval_thread), operation_propagation_thread: Some(operation_propagation_thread), diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs new file mode 100644 index 00000000000..b8354c04674 --- /dev/null +++ b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs @@ -0,0 +1,38 @@ +use std::{collections::HashMap, thread::JoinHandle}; + +use crossbeam::channel::Receiver; +use massa_models::{endorsement::EndorsementId, prehash::PreHashSet}; +use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; + +use crate::messages::MessagesSerializer; + +use super::{internal_messages::InternalMessage, OperationMessageSerializer}; + +struct PropagationThread { + //TODO: Add pruning + cache_by_peer: HashMap>, +} + +pub fn start_propagation_thread( + internal_receiver: Receiver, + active_connections: SharedActiveConnections, +) -> JoinHandle<()> { + std::thread::spawn(move || { + let endorsement_serializer = MessagesSerializer::new() + .with_operation_message_serializer(OperationMessageSerializer::new()); + let mut propagation_thread = PropagationThread { + cache_by_peer: HashMap::new(), + }; + loop { + match internal_receiver.recv() { + Ok(internal_message) => match internal_message { + _ => todo!(), + }, + Err(err) => { + println!("Error: {:?}", err); + return; + } + } + } + }) +} diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs new file mode 100644 index 00000000000..a896a06894e --- /dev/null +++ b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs @@ -0,0 +1,97 @@ +use std::thread::JoinHandle; + +use crossbeam::{ + channel::{Receiver, Sender}, + select, +}; +use massa_pool_exports::PoolController; +use massa_serialization::{DeserializeError, Deserializer}; +use massa_storage::Storage; +use peernet::peer_id::PeerId; + +use super::{ + commands::OperationHandlerCommand, + internal_messages::InternalMessage, + messages::{OperationMessageDeserializer, OperationMessageDeserializerArgs}, +}; + +pub struct RetrievalThread { + receiver: Receiver<(PeerId, u64, Vec)>, + receiver_ext: Receiver, + pool_controller: Box, + storage: Storage, + internal_sender: Sender, +} + +impl RetrievalThread { + fn run(&mut self) { + //TODO: Real values + let mut operation_message_deserializer = + OperationMessageDeserializer::new(OperationMessageDeserializerArgs { + //TODO: Real value from config + max_operations_prefix_ids: u32::MAX, + max_operations: u32::MAX, + max_datastore_value_length: u64::MAX, + max_function_name_length: u16::MAX, + max_parameters_size: u32::MAX, + max_op_datastore_entry_count: u64::MAX, + max_op_datastore_key_length: u8::MAX, + max_op_datastore_value_length: u64::MAX, + }); + loop { + select! { + recv(self.receiver) -> msg => { + match msg { + Ok((peer_id, message_id, message)) => { + operation_message_deserializer.set_message_id(message_id); + let (rest, message) = operation_message_deserializer + .deserialize::(&message) + .unwrap(); + if !rest.is_empty() { + println!("Error: message not fully consumed"); + return; + } + match message { + _ => todo!() + } + } + Err(err) => { + println!("Error: {:?}", err); + return; + } + } + }, + recv(self.receiver_ext) -> command => { + match command { + Ok(command) => { + println!("Received command: {:?}", command); + } + Err(err) => { + println!("Error: {:?}", err); + return; + } + } + } + } + } + } +} + +pub fn start_retrieval_thread( + receiver: Receiver<(PeerId, u64, Vec)>, + receiver_ext: Receiver, + pool_controller: Box, + storage: Storage, + internal_sender: Sender, +) -> JoinHandle<()> { + std::thread::spawn(move || { + let mut retrieval_thread = RetrievalThread { + receiver, + receiver_ext, + pool_controller, + storage, + internal_sender, + }; + retrieval_thread.run(); + }) +} From 23735c65889c1e166f9359c91f1f08b7278f6594 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 13 Apr 2023 01:50:47 +0200 Subject: [PATCH 02/11] Update cache to use a simplified Lru and share it. Use the same channel for all communications to the propagator --- massa-protocol-exports-2/src/error.rs | 2 + massa-protocol-worker-2/Cargo.toml | 4 + massa-protocol-worker-2/src/connectivity.rs | 189 ++++++++++-------- massa-protocol-worker-2/src/controller.rs | 5 +- .../src/handlers/operation_handler/cache.rs | 22 ++ .../handlers/operation_handler/commands.rs | 10 - .../operation_handler/commands_propagation.rs | 7 + .../operation_handler/internal_messages.rs | 7 - .../src/handlers/operation_handler/mod.rs | 28 +-- .../handlers/operation_handler/propagation.rs | 52 +++-- .../handlers/operation_handler/retrieval.rs | 181 +++++++++++++---- massa-protocol-worker-2/src/lib.rs | 3 + massa-protocol-worker-2/src/sig_verifier.rs | 29 +++ 13 files changed, 366 insertions(+), 173 deletions(-) create mode 100644 massa-protocol-worker-2/src/handlers/operation_handler/cache.rs delete mode 100644 massa-protocol-worker-2/src/handlers/operation_handler/commands.rs create mode 100644 massa-protocol-worker-2/src/handlers/operation_handler/commands_propagation.rs delete mode 100644 massa-protocol-worker-2/src/handlers/operation_handler/internal_messages.rs create mode 100644 massa-protocol-worker-2/src/sig_verifier.rs diff --git a/massa-protocol-exports-2/src/error.rs b/massa-protocol-exports-2/src/error.rs index 8b9f1957034..97bb4079883 100644 --- a/massa-protocol-exports-2/src/error.rs +++ b/massa-protocol-exports-2/src/error.rs @@ -33,6 +33,8 @@ pub enum ProtocolError { MissingPeersError, /// Models error: {0} ModelsError(#[from] ModelsError), + /// Send error: {0} + SendError(String), /// Container inconsistency error: {0} ContainerInconsistencyError(String), /// Invalid operation error: {0} diff --git a/massa-protocol-worker-2/Cargo.toml b/massa-protocol-worker-2/Cargo.toml index 44e8acb1ea4..4173748cafd 100644 --- a/massa-protocol-worker-2/Cargo.toml +++ b/massa-protocol-worker-2/Cargo.toml @@ -14,16 +14,20 @@ nom = "7.1" num_enum = "0.5" peernet = { git = "https://github.com/massalabs/PeerNet" } tempfile = { version = "3.3", optional = true } # use with testing feature +rayon = "1.7.0" +lru = "0.10.0" # modules Custom massa_hash = { path = "../massa-hash" } massa_models = { path = "../massa-models" } +massa_logging = { path = "../massa-logging" } massa_protocol_exports_2 = { path = "../massa-protocol-exports-2" } massa_consensus_exports = { path = "../massa-consensus-exports" } massa_pool_exports = { path = "../massa-pool-exports" } massa_storage = { path = "../massa-storage" } massa_serialization = { path = "../massa-serialization" } massa_signature = { path = "../massa-signature" } +massa_time = { path = "../massa-time" } [dev-dependencies] tempfile = "3.3" diff --git a/massa-protocol-worker-2/src/connectivity.rs b/massa-protocol-worker-2/src/connectivity.rs index d8d6989d021..3c2dbe41e4f 100644 --- a/massa-protocol-worker-2/src/connectivity.rs +++ b/massa-protocol-worker-2/src/connectivity.rs @@ -6,21 +6,22 @@ use massa_pool_exports::PoolController; use massa_protocol_exports_2::{ProtocolConfig, ProtocolError}; use massa_serialization::U64VarIntDeserializer; use massa_storage::Storage; +use parking_lot::RwLock; use peernet::{ config::PeerNetConfiguration, network_manager::PeerNetManager, peer_id::PeerId, transports::{OutConnectionConfig, TcpOutConnectionConfig, TransportType}, }; -use std::ops::Bound::Included; use std::{collections::HashMap, net::SocketAddr, thread::JoinHandle, time::Duration}; +use std::{num::NonZeroUsize, ops::Bound::Included, sync::Arc}; use crate::{ controller::ProtocolControllerImpl, handlers::{ block_handler::BlockHandler, endorsement_handler::EndorsementHandler, - operation_handler::OperationHandler, + operation_handler::{cache::OperationCache, OperationHandler}, peer_handler::{fallback_function, MassaHandshake, PeerManagementHandler}, }, messages::MessagesHandler, @@ -51,102 +52,118 @@ pub fn start_connectivity_thread( let (sender_endorsements_ext, receiver_endorsements_ext) = unbounded(); let (sender_blocks_ext, receiver_blocks_ext) = unbounded(); - let handle = std::thread::spawn(move || { - let (mut peer_manager_handler, sender_peers) = PeerManagementHandler::new(initial_peers); - //TODO: Bound the channel - // Channels network <-> handlers - let (sender_operations, receiver_operations) = unbounded(); - let (sender_endorsements, receiver_endorsements) = unbounded(); - let (sender_blocks, receiver_blocks) = unbounded(); + let handle = std::thread::spawn({ + let sender_operations_ext = sender_operations_ext.clone(); + move || { + let (mut peer_manager_handler, sender_peers) = + PeerManagementHandler::new(initial_peers); + //TODO: Bound the channel + // Channels network <-> handlers + let (sender_operations, receiver_operations) = unbounded(); + let (sender_endorsements, receiver_endorsements) = unbounded(); + let (sender_blocks, receiver_blocks) = unbounded(); - // Register channels for handlers - let message_handlers: MessagesHandler = MessagesHandler { - sender_blocks, - sender_endorsements, - sender_operations, - sender_peers, - id_deserializer: U64VarIntDeserializer::new(Included(0), Included(u64::MAX)), - }; + // Register channels for handlers + let message_handlers: MessagesHandler = MessagesHandler { + sender_blocks, + sender_endorsements, + sender_operations, + sender_peers, + id_deserializer: U64VarIntDeserializer::new(Included(0), Included(u64::MAX)), + }; - let mut peernet_config = - PeerNetConfiguration::default(MassaHandshake::new(), message_handlers); - peernet_config.self_keypair = config.keypair; - peernet_config.fallback_function = Some(&fallback_function); - //TODO: Add the rest of the config - peernet_config.max_in_connections = config.max_in_connections; - peernet_config.max_out_connections = config.max_out_connections; + let mut peernet_config = + PeerNetConfiguration::default(MassaHandshake::new(), message_handlers); + peernet_config.self_keypair = config.keypair.clone(); + peernet_config.fallback_function = Some(&fallback_function); + //TODO: Add the rest of the config + peernet_config.max_in_connections = config.max_in_connections; + peernet_config.max_out_connections = config.max_out_connections; - let mut manager = PeerNetManager::new(peernet_config); + let mut manager = PeerNetManager::new(peernet_config); - // Start handlers - let mut operation_handler = OperationHandler::new( - manager.active_connections.clone(), - receiver_operations, - receiver_operations_ext, - ); - let mut endorsement_handler = EndorsementHandler::new( - pool_controller, - storage, - manager.active_connections.clone(), - receiver_endorsements, - receiver_endorsements_ext, - ); - let mut block_handler = BlockHandler::new( - manager.active_connections.clone(), - receiver_blocks, - receiver_blocks_ext, - ); + // Create cache outside of the op handler because it could be used by other handlers + //TODO: Add real config values + let operation_cache = Arc::new(RwLock::new(OperationCache::new( + NonZeroUsize::new(usize::MAX).unwrap(), + NonZeroUsize::new(usize::MAX).unwrap(), + ))); - for (addr, transport) in config.listeners { - manager.start_listener(transport, addr).expect(&format!( - "Failed to start listener {:?} of transport {:?} in protocol", - addr, transport - )); - } - //Try to connect to peers - loop { - select! { - recv(receiver) -> msg => { - if let Ok(ConnectivityCommand::Stop) = msg { - if let Some(handle) = peer_manager_handler.thread_join.take() { - handle.join().expect("Failed to join peer manager thread"); + // Start handlers + let mut operation_handler = OperationHandler::new( + pool_controller.clone(), + storage.clone_without_refs(), + config.clone(), + operation_cache, + manager.active_connections.clone(), + receiver_operations, + sender_operations_ext, + receiver_operations_ext, + ); + let mut endorsement_handler = EndorsementHandler::new( + pool_controller, + storage, + manager.active_connections.clone(), + receiver_endorsements, + receiver_endorsements_ext, + ); + let mut block_handler = BlockHandler::new( + manager.active_connections.clone(), + receiver_blocks, + receiver_blocks_ext, + ); + + for (addr, transport) in config.listeners { + manager.start_listener(transport, addr).expect(&format!( + "Failed to start listener {:?} of transport {:?} in protocol", + addr, transport + )); + } + //Try to connect to peers + loop { + select! { + recv(receiver) -> msg => { + if let Ok(ConnectivityCommand::Stop) = msg { + if let Some(handle) = peer_manager_handler.thread_join.take() { + handle.join().expect("Failed to join peer manager thread"); + } + operation_handler.stop(); + endorsement_handler.stop(); + block_handler.stop(); + break; } - operation_handler.stop(); - endorsement_handler.stop(); - block_handler.stop(); - break; } - } - default(Duration::from_millis(2000)) => { - // Check if we need to connect to peers - let nb_connection_to_try = { - let active_connections = manager.active_connections.read(); - let connection_to_try = active_connections.max_out_connections - active_connections.nb_out_connections; - if connection_to_try <= 0 { - continue; - } - connection_to_try - }; - // Get the best peers - { - let peer_db_read = peer_manager_handler.peer_db.read(); - let best_peers = peer_db_read.index_by_newest.iter().take(nb_connection_to_try as usize); - for (_timestamp, peer_id) in best_peers { - let peer_info = peer_db_read.peers.get(peer_id).unwrap(); - if peer_info.last_announce.listeners.is_empty() { + default(Duration::from_millis(2000)) => { + // Check if we need to connect to peers + let nb_connection_to_try = { + let active_connections = manager.active_connections.read(); + let connection_to_try = active_connections.max_out_connections - active_connections.nb_out_connections; + if connection_to_try <= 0 { continue; } - { - let active_connections = manager.active_connections.read(); - if active_connections.connections.contains_key(peer_id) { + connection_to_try + }; + // Get the best peers + { + let peer_db_read = peer_manager_handler.peer_db.read(); + let best_peers = peer_db_read.index_by_newest.iter().take(nb_connection_to_try as usize); + for (_timestamp, peer_id) in best_peers { + let peer_info = peer_db_read.peers.get(peer_id).unwrap(); + if peer_info.last_announce.listeners.is_empty() { continue; } - } - // We only manage TCP for now - let (addr, _transport) = peer_info.last_announce.listeners.iter().next().unwrap(); - manager.try_connect(*addr, Duration::from_millis(200), &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig {}))).unwrap(); + { + let active_connections = manager.active_connections.read(); + if active_connections.connections.contains_key(peer_id) { + continue; + } + } + // We only manage TCP for now + let (addr, _transport) = peer_info.last_announce.listeners.iter().next().unwrap(); + manager.try_connect(*addr, Duration::from_millis(200), &OutConnectionConfig::Tcp(Box::new(TcpOutConnectionConfig {}))).unwrap(); + }; }; - }; + } } } } diff --git a/massa-protocol-worker-2/src/controller.rs b/massa-protocol-worker-2/src/controller.rs index 09d270770aa..ce4dc03d2ca 100644 --- a/massa-protocol-worker-2/src/controller.rs +++ b/massa-protocol-worker-2/src/controller.rs @@ -10,7 +10,7 @@ use massa_storage::Storage; use crate::handlers::{ block_handler::commands::BlockHandlerCommand, endorsement_handler::commands::EndorsementHandlerCommand, - operation_handler::commands::OperationHandlerCommand, + operation_handler::commands_propagation::OperationHandlerCommand, }; #[derive(Clone)] @@ -72,8 +72,9 @@ impl ProtocolController for ProtocolControllerImpl { /// /// note: Full `OperationId` is replaced by a `OperationPrefixId` later by the worker. fn propagate_operations(&self, operations: Storage) -> Result<(), ProtocolError> { + let operations = operations.get_op_refs().clone(); self.sender_operation_handler - .send(OperationHandlerCommand::PropagateOperations(operations)) + .send(OperationHandlerCommand::AnnounceOperations(operations)) .map_err(|_| { ProtocolError::ChannelError("propagate_operations command send error".into()) }) diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs b/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs new file mode 100644 index 00000000000..308f160e9de --- /dev/null +++ b/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs @@ -0,0 +1,22 @@ +use std::{collections::HashSet, num::NonZeroUsize, sync::Arc}; + +use lru::LruCache; +use massa_models::operation::{OperationId, OperationPrefixId}; +use parking_lot::RwLock; +use peernet::peer_id::PeerId; + +pub struct OperationCache { + pub checked_operations: LruCache, + pub ops_known_by_peer: LruCache>, +} + +impl OperationCache { + pub fn new(max_known_ops: NonZeroUsize, max_known_ops_by_peer: NonZeroUsize) -> Self { + Self { + checked_operations: LruCache::new(max_known_ops), + ops_known_by_peer: LruCache::new(max_known_ops_by_peer), + } + } +} + +pub type SharedOperationCache = Arc>; diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/commands.rs b/massa-protocol-worker-2/src/handlers/operation_handler/commands.rs deleted file mode 100644 index 2d5429f4d63..00000000000 --- a/massa-protocol-worker-2/src/handlers/operation_handler/commands.rs +++ /dev/null @@ -1,10 +0,0 @@ -use massa_storage::Storage; - -/// Commands that the operations handler can process -#[derive(Debug)] -pub enum OperationHandlerCommand { - /// Propagate operations (send batches) - /// note: `Set` are replaced with `OperationPrefixIds` - /// by the controller - PropagateOperations(Storage), -} diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/commands_propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/commands_propagation.rs new file mode 100644 index 00000000000..1cdf62018f5 --- /dev/null +++ b/massa-protocol-worker-2/src/handlers/operation_handler/commands_propagation.rs @@ -0,0 +1,7 @@ +use massa_models::{operation::OperationId, prehash::PreHashSet}; + +#[derive(Clone)] +pub enum OperationHandlerCommand { + /// (From peer id (optional, can come from API or other modules)), operations ids) + AnnounceOperations(PreHashSet), +} diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/internal_messages.rs b/massa-protocol-worker-2/src/handlers/operation_handler/internal_messages.rs deleted file mode 100644 index f5fc5d2c165..00000000000 --- a/massa-protocol-worker-2/src/handlers/operation_handler/internal_messages.rs +++ /dev/null @@ -1,7 +0,0 @@ -use peernet::peer_id::PeerId; - -pub enum InternalMessage { - /// (From peer id (optional, can come from API or other modules)), endorsements) - /// TODO: Add data - PropagateOperations((Option, ())), -} diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs b/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs index 8c039f30b29..d04c1b07c4d 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs @@ -1,17 +1,18 @@ use std::thread::JoinHandle; -use crossbeam::channel::{unbounded, Receiver}; +use crossbeam::channel::{Receiver, Sender}; use massa_pool_exports::PoolController; +use massa_protocol_exports_2::ProtocolConfig; use massa_storage::Storage; use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; use self::{ - commands::OperationHandlerCommand, propagation::start_propagation_thread, - retrieval::start_retrieval_thread, + cache::SharedOperationCache, commands_propagation::OperationHandlerCommand, + propagation::start_propagation_thread, retrieval::start_retrieval_thread, }; -pub mod commands; -mod internal_messages; +pub mod cache; +pub mod commands_propagation; mod messages; mod propagation; mod retrieval; @@ -27,22 +28,25 @@ impl OperationHandler { pub fn new( pool_controller: Box, storage: Storage, + config: ProtocolConfig, + cache: SharedOperationCache, active_connections: SharedActiveConnections, - receiver: Receiver<(PeerId, u64, Vec)>, - receiver_ext: Receiver, + receiver_network: Receiver<(PeerId, u64, Vec)>, + local_sender: Sender, + local_receiver: Receiver, ) -> Self { //TODO: Define bound channel - let (internal_sender, internal_receiver) = unbounded(); let operation_retrieval_thread = start_retrieval_thread( - receiver, - receiver_ext, + receiver_network, pool_controller, storage, - internal_sender, + config.clone(), + cache.clone(), + local_sender, ); let operation_propagation_thread = - start_propagation_thread(internal_receiver, active_connections); + start_propagation_thread(local_receiver, active_connections, config, cache); Self { operation_retrieval_thread: Some(operation_retrieval_thread), operation_propagation_thread: Some(operation_propagation_thread), diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs index b8354c04674..de8400c5570 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs @@ -1,32 +1,35 @@ -use std::{collections::HashMap, thread::JoinHandle}; +use std::thread::JoinHandle; use crossbeam::channel::Receiver; -use massa_models::{endorsement::EndorsementId, prehash::PreHashSet}; -use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; +use massa_models::operation::OperationId; +use massa_protocol_exports_2::ProtocolConfig; +use peernet::network_manager::SharedActiveConnections; use crate::messages::MessagesSerializer; -use super::{internal_messages::InternalMessage, OperationMessageSerializer}; +use super::{ + cache::SharedOperationCache, commands_propagation::OperationHandlerCommand, + OperationMessageSerializer, +}; struct PropagationThread { + internal_receiver: Receiver, //TODO: Add pruning - cache_by_peer: HashMap>, + operations_to_announce: Vec, + config: ProtocolConfig, + cache: SharedOperationCache, } -pub fn start_propagation_thread( - internal_receiver: Receiver, - active_connections: SharedActiveConnections, -) -> JoinHandle<()> { - std::thread::spawn(move || { - let endorsement_serializer = MessagesSerializer::new() +impl PropagationThread { + fn run(&mut self) { + let operation_serializer = MessagesSerializer::new() .with_operation_message_serializer(OperationMessageSerializer::new()); - let mut propagation_thread = PropagationThread { - cache_by_peer: HashMap::new(), - }; loop { - match internal_receiver.recv() { + match self.internal_receiver.recv() { Ok(internal_message) => match internal_message { - _ => todo!(), + OperationHandlerCommand::AnnounceOperations(operations_ids) => { + self.operations_to_announce.extend(operations_ids); + } }, Err(err) => { println!("Error: {:?}", err); @@ -34,5 +37,22 @@ pub fn start_propagation_thread( } } } + } +} + +pub fn start_propagation_thread( + internal_receiver: Receiver, + active_connections: SharedActiveConnections, + config: ProtocolConfig, + cache: SharedOperationCache, +) -> JoinHandle<()> { + std::thread::spawn(move || { + let mut propagation_thread = PropagationThread { + internal_receiver, + operations_to_announce: Vec::new(), + config, + cache, + }; + propagation_thread.run(); }) } diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs index a896a06894e..fd03d347bf3 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs @@ -1,26 +1,37 @@ use std::thread::JoinHandle; -use crossbeam::{ - channel::{Receiver, Sender}, - select, +use crossbeam::channel::{Receiver, Sender}; +use massa_logging::massa_trace; +use massa_models::{ + operation::{OperationId, SecureShareOperation}, + prehash::{CapacityAllocator, PreHashMap, PreHashSet}, + secure_share::Id, + slot::Slot, + timeslots::get_block_slot_timestamp, }; use massa_pool_exports::PoolController; +use massa_protocol_exports_2::{ProtocolConfig, ProtocolError}; use massa_serialization::{DeserializeError, Deserializer}; use massa_storage::Storage; +use massa_time::MassaTime; use peernet::peer_id::PeerId; +use crate::sig_verifier::verify_sigs_batch; +use tracing::warn; + use super::{ - commands::OperationHandlerCommand, - internal_messages::InternalMessage, - messages::{OperationMessageDeserializer, OperationMessageDeserializerArgs}, + cache::SharedOperationCache, + commands_propagation::OperationHandlerCommand, + messages::{OperationMessage, OperationMessageDeserializer, OperationMessageDeserializerArgs}, }; pub struct RetrievalThread { receiver: Receiver<(PeerId, u64, Vec)>, - receiver_ext: Receiver, pool_controller: Box, + cache: SharedOperationCache, storage: Storage, - internal_sender: Sender, + config: ProtocolConfig, + internal_sender: Sender, } impl RetrievalThread { @@ -39,58 +50,148 @@ impl RetrievalThread { max_op_datastore_value_length: u64::MAX, }); loop { - select! { - recv(self.receiver) -> msg => { - match msg { - Ok((peer_id, message_id, message)) => { - operation_message_deserializer.set_message_id(message_id); - let (rest, message) = operation_message_deserializer - .deserialize::(&message) - .unwrap(); - if !rest.is_empty() { - println!("Error: message not fully consumed"); - return; - } - match message { - _ => todo!() - } - } - Err(err) => { - println!("Error: {:?}", err); - return; - } + match self.receiver.recv() { + Ok((peer_id, message_id, message)) => { + operation_message_deserializer.set_message_id(message_id); + let (rest, message) = operation_message_deserializer + .deserialize::(&message) + .unwrap(); + if !rest.is_empty() { + println!("Error: message not fully consumed"); + return; } - }, - recv(self.receiver_ext) -> command => { - match command { - Ok(command) => { - println!("Received command: {:?}", command); - } - Err(err) => { - println!("Error: {:?}", err); - return; + match message { + OperationMessage::Operations(ops) => { + if let Err(err) = self.note_operations_from_peer(ops, &peer_id) { + warn!("peer {} sent us critically incorrect operation, which may be an attack attempt by the remote peer or a loss of sync between us and the remote peer. Err = {}", peer_id, err); + //TODO: Ban + //let _ = self.ban_node(&node_id).await; + } } + _ => todo!(), } } + Err(err) => { + println!("Error: {:?}", err); + return; + } + } + } + } + + fn note_operations_from_peer( + &mut self, + operations: Vec, + source_peer_id: &PeerId, + ) -> Result<(), ProtocolError> { + massa_trace!("protocol.protocol_worker.note_operations_from_peer", { "peer": source_peer_id, "operations": operations }); + let length = operations.len(); + let mut new_operations = PreHashMap::with_capacity(length); + let mut received_ids = PreHashSet::with_capacity(length); + for operation in operations { + let operation_id = operation.id; + if operation.serialized_size() > self.config.max_serialized_operations_size_per_block { + return Err(ProtocolError::InvalidOperationError(format!( + "Operation {} exceeds max block size, maximum authorized {} bytes but found {} bytes", + operation_id, + operation.serialized_size(), + self.config.max_serialized_operations_size_per_block + ))); + }; + received_ids.insert(operation_id); + + // Check operation signature only if not already checked. + if !self.cache.read().checked_operations.contains(&operation_id) { + // check signature if the operation wasn't in `checked_operation` + new_operations.insert(operation_id, operation); + }; + } + + // optimized signature verification + verify_sigs_batch( + &new_operations + .iter() + .map(|(op_id, op)| (*op_id.get_hash(), op.signature, op.content_creator_pub_key)) + .collect::>(), + )?; + + { + // add to checked operations + let mut cache_write = self.cache.write(); + for op in new_operations.keys().copied() { + cache_write.checked_operations.put(op, ()); + } + + // add to known ops + if let Some(known_ops) = cache_write.ops_known_by_peer.get_mut(source_peer_id) { + known_ops.extend(received_ids.iter().map(|id| id.prefix())); } } + + if !new_operations.is_empty() { + // Store operation, claim locally + let mut ops = self.storage.clone_without_refs(); + ops.store_operations(new_operations.into_values().collect()); + + // Propagate operations when their expire period isn't `max_operations_propagation_time` old. + let mut ops_to_propagate = ops.clone(); + let operations_to_not_propagate = { + let now = MassaTime::now()?; + let read_operations = ops_to_propagate.read_operations(); + ops_to_propagate + .get_op_refs() + .iter() + .filter(|op_id| { + let expire_period = + read_operations.get(op_id).unwrap().content.expire_period; + let expire_period_timestamp = get_block_slot_timestamp( + self.config.thread_count, + self.config.t0, + self.config.genesis_timestamp, + Slot::new(expire_period, 0), + ); + match expire_period_timestamp { + Ok(slot_timestamp) => { + slot_timestamp + .saturating_add(self.config.max_operations_propagation_time) + < now + } + Err(_) => true, + } + }) + .copied() + .collect() + }; + ops_to_propagate.drop_operation_refs(&operations_to_not_propagate); + let to_announce: PreHashSet = + ops_to_propagate.get_op_refs().iter().copied().collect(); + self.internal_sender + .send(OperationHandlerCommand::AnnounceOperations(to_announce)) + .map_err(|err| ProtocolError::SendError(err.to_string()))?; + // Add to pool + self.pool_controller.add_operations(ops); + } + + Ok(()) } } pub fn start_retrieval_thread( receiver: Receiver<(PeerId, u64, Vec)>, - receiver_ext: Receiver, pool_controller: Box, storage: Storage, - internal_sender: Sender, + config: ProtocolConfig, + cache: SharedOperationCache, + internal_sender: Sender, ) -> JoinHandle<()> { std::thread::spawn(move || { let mut retrieval_thread = RetrievalThread { receiver, - receiver_ext, pool_controller, storage, internal_sender, + cache, + config, }; retrieval_thread.run(); }) diff --git a/massa-protocol-worker-2/src/lib.rs b/massa-protocol-worker-2/src/lib.rs index 36d376dc631..a9b9fe4b010 100644 --- a/massa-protocol-worker-2/src/lib.rs +++ b/massa-protocol-worker-2/src/lib.rs @@ -1,8 +1,11 @@ +#![feature(map_try_insert)] + mod connectivity; mod controller; mod handlers; mod manager; mod messages; +mod sig_verifier; mod worker; pub use worker::start_protocol_controller; diff --git a/massa-protocol-worker-2/src/sig_verifier.rs b/massa-protocol-worker-2/src/sig_verifier.rs new file mode 100644 index 00000000000..1ed1c470209 --- /dev/null +++ b/massa-protocol-worker-2/src/sig_verifier.rs @@ -0,0 +1,29 @@ +// Copyright (c) 2022 MASSA LABS + +//! Optimized batch signature verifier + +use massa_hash::Hash; +use massa_protocol_exports_2::ProtocolError; +use massa_signature::{verify_signature_batch, PublicKey, Signature}; +use rayon::{prelude::ParallelIterator, slice::ParallelSlice}; + +/// Limit for small batch optimization +const SMALL_BATCH_LIMIT: usize = 2; + +/// Efficiently verifies a batch of signatures in parallel. +/// Returns an error if at least one of them fails to verify. +pub fn verify_sigs_batch(ops: &[(Hash, Signature, PublicKey)]) -> Result<(), ProtocolError> { + // if it's a small batch, use single-core verification + if ops.len() <= SMALL_BATCH_LIMIT { + return verify_signature_batch(ops).map_err(|_err| ProtocolError::WrongSignature); + } + + // otherwise, use parallel batch verif + + // compute chunk size for parallelization + let chunk_size = std::cmp::max(1, ops.len() / rayon::current_num_threads()); + // process chunks in parallel + ops.par_chunks(chunk_size) + .try_for_each(verify_signature_batch) + .map_err(|_err| ProtocolError::WrongSignature) +} From 434ac4abadd70dacf2e251cbf6e5d59010aa6772 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 13 Apr 2023 02:21:01 +0200 Subject: [PATCH 03/11] Add send of op and remove peers from cache when needed --- .../handlers/operation_handler/propagation.rs | 100 ++++++++++++++++-- 1 file changed, 91 insertions(+), 9 deletions(-) diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs index de8400c5570..f11f77e9fb1 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs @@ -1,11 +1,13 @@ -use std::thread::JoinHandle; +use std::{mem, thread::JoinHandle}; -use crossbeam::channel::Receiver; +use crossbeam::channel::{Receiver, RecvTimeoutError}; +use massa_logging::massa_trace; use massa_models::operation::OperationId; use massa_protocol_exports_2::ProtocolConfig; -use peernet::network_manager::SharedActiveConnections; +use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; +use tracing::debug; -use crate::messages::MessagesSerializer; +use crate::{handlers::operation_handler::OperationMessage, messages::MessagesSerializer}; use super::{ cache::SharedOperationCache, commands_propagation::OperationHandlerCommand, @@ -14,30 +16,107 @@ use super::{ struct PropagationThread { internal_receiver: Receiver, + active_connections: SharedActiveConnections, //TODO: Add pruning operations_to_announce: Vec, config: ProtocolConfig, cache: SharedOperationCache, + operation_message_serializer: MessagesSerializer, } impl PropagationThread { fn run(&mut self) { - let operation_serializer = MessagesSerializer::new() - .with_operation_message_serializer(OperationMessageSerializer::new()); + let mut next_announce = std::time::Instant::now() + .checked_add(self.config.operation_announcement_interval.to_duration()) + .expect("Can't init interval op propagation"); loop { - match self.internal_receiver.recv() { + match self.internal_receiver.recv_deadline(next_announce) { Ok(internal_message) => match internal_message { OperationHandlerCommand::AnnounceOperations(operations_ids) => { self.operations_to_announce.extend(operations_ids); + if self.operations_to_announce.len() + > self.config.operation_announcement_buffer_capacity + { + self.announce_ops(); + next_announce = std::time::Instant::now() + .checked_add( + self.config.operation_announcement_interval.to_duration(), + ) + .expect("Can't init interval op propagation"); + } } }, - Err(err) => { - println!("Error: {:?}", err); + Err(RecvTimeoutError::Timeout) => { + self.announce_ops(); + next_announce = std::time::Instant::now() + .checked_add(self.config.operation_announcement_interval.to_duration()) + .expect("Can't init interval op propagation"); + } + Err(RecvTimeoutError::Disconnected) => { return; } } } } + + fn announce_ops(&mut self) { + // Quit if empty to avoid iterating on nodes + if self.operations_to_announce.is_empty() { + return; + } + let operation_ids = mem::take(&mut self.operations_to_announce); + massa_trace!("protocol.protocol_worker.announce_ops.begin", { + "operation_ids": operation_ids + }); + { + let mut cache_write = self.cache.write(); + let peers: Vec = cache_write + .ops_known_by_peer + .iter() + .map(|(id, _)| id.clone()) + .collect(); + // Clean shared cache if peers do not exist anymore + for peer_id in peers { + if !self + .active_connections + .read() + .connections + .contains_key(&peer_id) + { + cache_write.ops_known_by_peer.pop(&peer_id); + } + } + // Propagate to peers + for (peer_id, ops) in cache_write.ops_known_by_peer.iter_mut() { + let new_ops: Vec = operation_ids + .iter() + .filter(|id| !ops.contains(&id.prefix())) + .copied() + .collect(); + if !new_ops.is_empty() { + ops.extend(new_ops.iter().map(|id| id.prefix())); + { + let mut active_connections = self.active_connections.write(); + if let Some(connection) = active_connections.connections.get_mut(peer_id) { + if let Err(err) = connection.send_channels.send( + &self.operation_message_serializer, + OperationMessage::OperationsAnnouncement( + new_ops.iter().map(|id| id.into_prefix()).collect(), + ) + .into(), + false, + ) { + debug!( + "could not send operation batch to node {}: {}", + peer_id, err + ); + } + } + } + } + } + } + } } pub fn start_propagation_thread( @@ -49,9 +128,12 @@ pub fn start_propagation_thread( std::thread::spawn(move || { let mut propagation_thread = PropagationThread { internal_receiver, + active_connections, operations_to_announce: Vec::new(), config, cache, + operation_message_serializer: MessagesSerializer::new() + .with_operation_message_serializer(OperationMessageSerializer::new()), }; propagation_thread.run(); }) From 84e580c05c98e502da78251d5ca9ea32f9cfbf49 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 13 Apr 2023 02:21:30 +0200 Subject: [PATCH 04/11] Update lock --- Cargo.lock | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 0bd07b96433..4e653613e30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2341,6 +2341,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "lru" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03f1160296536f10c833a82dca22267d5486734230d47bf00bf435885814ba1e" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "lz4-sys" version = "1.9.4" @@ -3135,19 +3144,23 @@ name = "massa_protocol_worker_2" version = "0.1.0" dependencies = [ "crossbeam", + "lru", "massa_consensus_exports", "massa_hash 0.1.0", + "massa_logging", "massa_models", "massa_pool_exports", "massa_protocol_exports_2", "massa_serialization 0.1.0", "massa_signature 0.1.0", "massa_storage", + "massa_time", "nom", "num_enum", "parking_lot", "peernet", "rand 0.8.5", + "rayon", "serde_json", "tempfile", "tracing", From 5a4e1a7dc5bfa6a5ce4fd9b76316501473f134d7 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 13 Apr 2023 13:01:43 +0200 Subject: [PATCH 05/11] Add management of messages received from retrieval. --- Cargo.lock | 26 +- .../src/handlers/operation_handler/cache.rs | 8 + .../handlers/operation_handler/propagation.rs | 7 + .../handlers/operation_handler/retrieval.rs | 229 ++++++++++++++++-- 4 files changed, 243 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e653613e30..5029e8f6450 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -237,9 +237,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.14" +version = "0.6.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e5abc76e6ef57bf438c067d2898481475994102eeb5d662a0b2162d0c2fdcf1" +checksum = "3b32c5ea3aabaf4deb5f5ced2d688ec0844c881c9e6c696a8b769a05fc691e62" dependencies = [ "async-trait", "axum-core", @@ -3736,7 +3736,7 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" [[package]] name = "peernet" version = "0.1.0" -source = "git+https://github.com/massalabs/PeerNet#9766c89c407fe504d7a6e2a1761537558224d931" +source = "git+https://github.com/massalabs/PeerNet#34404c0fae38fa01de126084e9c1cebd1ce5c7c4" dependencies = [ "crossbeam", "enum_delegate", @@ -3981,9 +3981,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", "prost-derive", @@ -3991,9 +3991,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c828f93f5ca4826f97fedcbd3f9a536c16b12cff3dbbb4a007f932bbad95b12" +checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", "heck 0.4.1", @@ -4013,9 +4013,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea9b0f8cbe5e15a8a042d030bd96668db28ecb567ec37d691971ff5731d2b1b" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", "itertools", @@ -4026,9 +4026,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ "prost", ] @@ -4609,9 +4609,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.95" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d721eca97ac802aa7777b701877c8004d950fc142651367300d21c1cc0194744" +checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1" dependencies = [ "itoa", "ryu", diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs b/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs index 308f160e9de..7ae1fb3432b 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs @@ -7,6 +7,7 @@ use peernet::peer_id::PeerId; pub struct OperationCache { pub checked_operations: LruCache, + pub checked_operations_prefix: LruCache, pub ops_known_by_peer: LruCache>, } @@ -14,9 +15,16 @@ impl OperationCache { pub fn new(max_known_ops: NonZeroUsize, max_known_ops_by_peer: NonZeroUsize) -> Self { Self { checked_operations: LruCache::new(max_known_ops), + checked_operations_prefix: LruCache::new(max_known_ops), ops_known_by_peer: LruCache::new(max_known_ops_by_peer), } } + + pub fn insert_checked_operation(&mut self, operation_id: OperationId) { + self.checked_operations.put(operation_id, ()); + self.checked_operations_prefix + .put(operation_id.prefix(), ()); + } } pub type SharedOperationCache = Arc>; diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs index f11f77e9fb1..043e0203c25 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs @@ -33,6 +33,13 @@ impl PropagationThread { match self.internal_receiver.recv_deadline(next_announce) { Ok(internal_message) => match internal_message { OperationHandlerCommand::AnnounceOperations(operations_ids) => { + // Note operations as checked. + { + let mut cache_write = self.cache.write(); + for op_id in operations_ids.iter().copied() { + cache_write.insert_checked_operation(op_id); + } + } self.operations_to_announce.extend(operations_ids); if self.operations_to_announce.len() > self.config.operation_announcement_buffer_capacity diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs index fd03d347bf3..f8ca4e3b4ae 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs @@ -1,9 +1,13 @@ -use std::thread::JoinHandle; +use std::{ + collections::{HashSet, VecDeque}, + thread::JoinHandle, + time::Instant, +}; -use crossbeam::channel::{Receiver, Sender}; +use crossbeam::channel::{Receiver, RecvTimeoutError, Sender}; use massa_logging::massa_trace; use massa_models::{ - operation::{OperationId, SecureShareOperation}, + operation::{OperationId, OperationPrefixId, OperationPrefixIds, SecureShareOperation}, prehash::{CapacityAllocator, PreHashMap, PreHashSet}, secure_share::Id, slot::Slot, @@ -13,7 +17,7 @@ use massa_pool_exports::PoolController; use massa_protocol_exports_2::{ProtocolConfig, ProtocolError}; use massa_serialization::{DeserializeError, Deserializer}; use massa_storage::Storage; -use massa_time::MassaTime; +use massa_time::{MassaTime, TimeError}; use peernet::peer_id::PeerId; use crate::sig_verifier::verify_sigs_batch; @@ -25,10 +29,24 @@ use super::{ messages::{OperationMessage, OperationMessageDeserializer, OperationMessageDeserializerArgs}, }; +/// Structure containing a Batch of `operation_ids` we would like to ask +/// to a `peer_id` now or later. Mainly used in protocol and translated into +/// simple combination of a `peer_id` and `operations_prefix_ids` +pub struct OperationBatchItem { + /// last updated at instant + pub instant: Instant, + /// node id + pub peer_id: PeerId, + /// operation prefix ids + pub operations_prefix_ids: OperationPrefixIds, +} + pub struct RetrievalThread { receiver: Receiver<(PeerId, u64, Vec)>, pool_controller: Box, cache: SharedOperationCache, + asked_operations: PreHashMap)>, + op_batch_buffer: VecDeque, storage: Storage, config: ProtocolConfig, internal_sender: Sender, @@ -49,8 +67,11 @@ impl RetrievalThread { max_op_datastore_key_length: u8::MAX, max_op_datastore_value_length: u64::MAX, }); + let mut next_ask_operations = Instant::now() + .checked_add(self.config.operation_batch_proc_period.to_duration()) + .expect("Can't add duration operation_batch_proc_period"); loop { - match self.receiver.recv() { + match self.receiver.recv_deadline(next_ask_operations) { Ok((peer_id, message_id, message)) => { operation_message_deserializer.set_message_id(message_id); let (rest, message) = operation_message_deserializer @@ -65,14 +86,33 @@ impl RetrievalThread { if let Err(err) = self.note_operations_from_peer(ops, &peer_id) { warn!("peer {} sent us critically incorrect operation, which may be an attack attempt by the remote peer or a loss of sync between us and the remote peer. Err = {}", peer_id, err); //TODO: Ban - //let _ = self.ban_node(&node_id).await; + //let _ = self.ban_node(&peer_id).await; + } + } + OperationMessage::OperationsAnnouncement(announcement) => { + if let Err(err) = + self.on_operations_announcements_received(announcement, &peer_id) + { + warn!("error when processing announcement received from peer {}: Err = {}", peer_id, err); + } + } + OperationMessage::AskForOperations(ask) => { + if let Err(err) = self.on_asked_operations_received(&peer_id, ask) { + warn!("error when processing asked operations received from peer {}: Err = {}", peer_id, err); } } - _ => todo!(), } } - Err(err) => { - println!("Error: {:?}", err); + Err(RecvTimeoutError::Timeout) => { + if let Err(err) = self.update_ask_operation() { + warn!("Error in update_ask_operation: {}", err); + }; + next_ask_operations = Instant::now() + .checked_add(self.config.operation_batch_proc_period.to_duration()) + .expect("Can't add duration operation_batch_proc_period"); + } + Err(RecvTimeoutError::Disconnected) => { + println!("Disconnected"); return; } } @@ -118,14 +158,15 @@ impl RetrievalThread { { // add to checked operations let mut cache_write = self.cache.write(); - for op in new_operations.keys().copied() { - cache_write.checked_operations.put(op, ()); + for op_id in new_operations.keys().copied() { + cache_write.insert_checked_operation(op_id); } // add to known ops - if let Some(known_ops) = cache_write.ops_known_by_peer.get_mut(source_peer_id) { - known_ops.extend(received_ids.iter().map(|id| id.prefix())); - } + let known_ops = cache_write + .ops_known_by_peer + .get_or_insert_mut(source_peer_id.clone(), || HashSet::default()); + known_ops.extend(received_ids.iter().map(|id| id.prefix())); } if !new_operations.is_empty() { @@ -174,6 +215,164 @@ impl RetrievalThread { Ok(()) } + + /// On receive a batch of operation ids `op_batch` from another `peer_id` + /// Execute the following algorithm: [redirect to GitHub](https://github.com/massalabs/massa/issues/2283#issuecomment-1040872779) + /// + ///```py + ///def process_op_batch(op_batch, peer_id): + /// ask_set = void HashSet + /// future_set = void HashSet + /// for op_id in op_batch: + /// if not is_op_received(op_id): + /// if (op_id not in asked_ops) or (peer_id not in asked_ops(op_id)[1]): + /// if (op_id not in asked_ops) or (asked_ops(op_id)[0] < now - op_batch_proc_period: + /// ask_set.add(op_id) + /// asked_ops(op_id)[0] = now + /// asked_ops(op_id)[1].add(peer_id) + /// else: + /// future_set.add(op_id) + /// if op_batch_buf is not full: + /// op_batch_buf.push(now+op_batch_proc_period, peer_id, future_set) + /// ask ask_set to peer_id + ///``` + fn on_operations_announcements_received( + &mut self, + mut op_batch: OperationPrefixIds, + peer_id: &PeerId, + ) -> Result<(), ProtocolError> { + // mark sender as knowing the ops + { + let mut cache_write = self.cache.write(); + let known_ops = cache_write + .ops_known_by_peer + .get_or_insert_mut(peer_id.clone(), || HashSet::default()); + known_ops.extend(op_batch.iter().copied()); + } + + // filter out the operations that we already know about + { + let cache_read = self.cache.read(); + op_batch.retain(|prefix| !cache_read.checked_operations_prefix.contains(prefix)); + } + + let mut ask_set = OperationPrefixIds::with_capacity(op_batch.len()); + let mut future_set = OperationPrefixIds::with_capacity(op_batch.len()); + // exactitude isn't important, we want to have a now for that function call + let now = Instant::now(); + let mut count_reask = 0; + for op_id in op_batch { + let wish = match self.asked_operations.get_mut(&op_id) { + Some(wish) => { + if wish.1.contains(&peer_id) { + continue; // already asked to the `peer_id` + } else { + Some(wish) // already asked but at someone else + } + } + None => None, + }; + if let Some(wish) = wish { + // Ask now if latest ask instant < now - operation_batch_proc_period + // otherwise add in future_set + if wish.0 + < now + .checked_sub(self.config.operation_batch_proc_period.into()) + .ok_or(TimeError::TimeOverflowError)? + { + count_reask += 1; + ask_set.insert(op_id); + wish.0 = now; + wish.1.push(peer_id.clone()); + } else { + future_set.insert(op_id); + } + } else { + ask_set.insert(op_id); + self.asked_operations + .insert(op_id, (now, vec![peer_id.clone()])); + } + } // EndOf for op_id in op_batch: + + if count_reask > 0 { + massa_trace!("re-ask operations.", { "count": count_reask }); + } + if self.op_batch_buffer.len() < self.config.operation_batch_buffer_capacity + && !future_set.is_empty() + { + self.op_batch_buffer.push_back(OperationBatchItem { + instant: now + .checked_add(self.config.operation_batch_proc_period.into()) + .ok_or(TimeError::TimeOverflowError)?, + peer_id: peer_id.clone(), + operations_prefix_ids: future_set, + }); + } + + //TODO: Wait damir's answer on Discord + // if !ask_set.is_empty() { + // self.network_command_sender + // .send_ask_for_operations(peer_id, ask_set) + // .await + // .map_err(|_| ProtocolError::ChannelError("send ask for operations failed".into())) + // } else { + // Ok(()) + // } + Ok(()) + } + + fn update_ask_operation(&mut self) -> Result<(), ProtocolError> { + let now = Instant::now(); + while !self.op_batch_buffer.is_empty() + // This unwrap is ok because we checked that it's not empty just before. + && now >= self.op_batch_buffer.front().unwrap().instant + { + let op_batch_item = self.op_batch_buffer.pop_front().unwrap(); + self.on_operations_announcements_received( + op_batch_item.operations_prefix_ids, + &op_batch_item.peer_id, + )?; + } + Ok(()) + } + + /// Process the reception of a batch of asked operations, that means that + /// we have already sent a batch of ids in the network, notifying that we already + /// have those operations. + fn on_asked_operations_received( + &mut self, + _peer_id: &PeerId, + op_pre_ids: OperationPrefixIds, + ) -> Result<(), ProtocolError> { + if op_pre_ids.is_empty() { + return Ok(()); + } + + let mut ops: Vec = Vec::with_capacity(op_pre_ids.len()); + { + // Scope the lock because of the async call to `send_operations` below. + let stored_ops = self.storage.read_operations(); + for prefix in op_pre_ids { + let opt_op = match stored_ops + .get_operations_by_prefix(&prefix) + .and_then(|ids| ids.iter().next()) + { + Some(id) => stored_ops.get(id), + None => continue, + }; + if let Some(op) = opt_op { + ops.push(op.clone()); + } + } + } + //TODO: See with damir + // if !ops.is_empty() { + // self.network_command_sender + // .send_operations(node_id, ops) + // .await?; + // } + Ok(()) + } } pub fn start_retrieval_thread( @@ -192,6 +391,8 @@ pub fn start_retrieval_thread( internal_sender, cache, config, + asked_operations: PreHashMap::default(), + op_batch_buffer: VecDeque::new(), }; retrieval_thread.run(); }) From e814860e234cf472ca74c02b838f0126246e89f9 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Thu, 13 Apr 2023 13:38:59 +0200 Subject: [PATCH 06/11] Change timers management and add pruning --- .../handlers/operation_handler/retrieval.rs | 85 ++++++++++++++++--- 1 file changed, 74 insertions(+), 11 deletions(-) diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs index f8ca4e3b4ae..7d225e0aae8 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashSet, VecDeque}, + collections::{HashSet, VecDeque, BTreeSet}, thread::JoinHandle, time::Instant, }; @@ -41,12 +41,53 @@ pub struct OperationBatchItem { pub operations_prefix_ids: OperationPrefixIds, } +enum RetrievalTimer { + NextAsk(Instant), + NextPrune(Instant), +} + +impl RetrievalTimer { + fn get_instant(&self) -> Instant { + match self { + RetrievalTimer::NextAsk(instant) => *instant, + RetrievalTimer::NextPrune(instant) => *instant, + } + } +} + +impl Eq for RetrievalTimer {} + +impl PartialEq for RetrievalTimer { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == std::cmp::Ordering::Equal + } +} + +impl PartialOrd for RetrievalTimer { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for RetrievalTimer { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match (self, other) { + (RetrievalTimer::NextAsk(_), RetrievalTimer::NextAsk(_)) => std::cmp::Ordering::Equal, + (RetrievalTimer::NextPrune(_), RetrievalTimer::NextPrune(_)) => { + std::cmp::Ordering::Equal + } + (a, b) => a.get_instant().cmp(&b.get_instant()), + } + } +} + pub struct RetrievalThread { receiver: Receiver<(PeerId, u64, Vec)>, pool_controller: Box, cache: SharedOperationCache, asked_operations: PreHashMap)>, op_batch_buffer: VecDeque, + timers: BTreeSet, storage: Storage, config: ProtocolConfig, internal_sender: Sender, @@ -67,11 +108,16 @@ impl RetrievalThread { max_op_datastore_key_length: u8::MAX, max_op_datastore_value_length: u64::MAX, }); - let mut next_ask_operations = Instant::now() - .checked_add(self.config.operation_batch_proc_period.to_duration()) - .expect("Can't add duration operation_batch_proc_period"); + self.timers.insert(RetrievalTimer::NextAsk(Instant::now() + .checked_add(self.config.operation_batch_proc_period.to_duration()) + .expect("Can't add duration operation_batch_proc_period"))); + self.timers.insert(RetrievalTimer::NextPrune(Instant::now().checked_add( + self.config.asked_operations_pruning_period.to_duration(), + ).expect("Can't add duration operation_retrieval_prune_period"))); loop { - match self.receiver.recv_deadline(next_ask_operations) { + let timer = self.timers.first().expect("No timers left in operation retrieval"); + //If there is message in the channel it will receive them even if the deadline is in the past + match self.receiver.recv_deadline(timer.get_instant()) { Ok((peer_id, message_id, message)) => { operation_message_deserializer.set_message_id(message_id); let (rest, message) = operation_message_deserializer @@ -104,12 +150,28 @@ impl RetrievalThread { } } Err(RecvTimeoutError::Timeout) => { - if let Err(err) = self.update_ask_operation() { - warn!("Error in update_ask_operation: {}", err); - }; - next_ask_operations = Instant::now() - .checked_add(self.config.operation_batch_proc_period.to_duration()) - .expect("Can't add duration operation_batch_proc_period"); + match timer { + RetrievalTimer::NextAsk(_) => { + self.timers.pop_first(); + if let Err(err) = self.update_ask_operation() { + warn!("Error in update_ask_operation: {}", err); + }; + let next_ask_operations = Instant::now() + .checked_add(self.config.operation_batch_proc_period.to_duration()) + .expect("Can't add duration operation_batch_proc_period"); + self.timers + .insert(RetrievalTimer::NextAsk(next_ask_operations)); + } + RetrievalTimer::NextPrune(_) => { + self.timers.pop_first(); + self.asked_operations.clear(); + let next_prune_operations = Instant::now() + .checked_add(self.config.asked_operations_pruning_period.to_duration()) + .expect("Can't add duration operation_prune_period"); + self.timers + .insert(RetrievalTimer::NextPrune(next_prune_operations)); + } + } } Err(RecvTimeoutError::Disconnected) => { println!("Disconnected"); @@ -393,6 +455,7 @@ pub fn start_retrieval_thread( config, asked_operations: PreHashMap::default(), op_batch_buffer: VecDeque::new(), + timers: BTreeSet::default() }; retrieval_thread.run(); }) From ea36640e1d37e99fd9b4c1b5361cb737423d60d1 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 14 Apr 2023 01:10:20 +0200 Subject: [PATCH 07/11] Update caches and add storage to keep ops to be propagated --- massa-protocol-exports-2/src/settings.rs | 4 + .../src/test_exports/config.rs | 2 + massa-protocol-worker-2/src/controller.rs | 1 + .../src/handlers/block_handler/mod.rs | 2 +- .../endorsement_handler/propagation.rs | 1 + .../src/handlers/operation_handler/cache.rs | 4 +- .../operation_handler/commands_propagation.rs | 2 +- .../src/handlers/operation_handler/mod.rs | 3 +- .../handlers/operation_handler/propagation.rs | 5 +- .../handlers/operation_handler/retrieval.rs | 287 +++++++++--------- massa-protocol-worker-2/src/messages.rs | 32 +- massa-protocol-worker-2/src/worker.rs | 2 +- 12 files changed, 182 insertions(+), 163 deletions(-) diff --git a/massa-protocol-exports-2/src/settings.rs b/massa-protocol-exports-2/src/settings.rs index 12cfa0183d2..94a58a0fa58 100644 --- a/massa-protocol-exports-2/src/settings.rs +++ b/massa-protocol-exports-2/src/settings.rs @@ -50,10 +50,14 @@ pub struct ProtocolConfig { pub operation_announcement_buffer_capacity: usize, /// Start processing batches in the buffer each `operation_batch_proc_period` in millisecond pub operation_batch_proc_period: MassaTime, + /// Maximum number of asked operations in the memory buffer. + pub asked_operations_buffer_capacity: usize, /// All operations asked are prune each `operation_asked_pruning_period` millisecond pub asked_operations_pruning_period: MassaTime, /// Interval at which operations are announced in batches. pub operation_announcement_interval: MassaTime, + /// Maximum time we keep an operation in the storage + pub max_operation_storage_time: MassaTime, /// Maximum of operations sent in one message. pub max_operations_per_message: u64, /// Maximum size in bytes of all serialized operations size in a block diff --git a/massa-protocol-exports-2/src/test_exports/config.rs b/massa-protocol-exports-2/src/test_exports/config.rs index aadc88545eb..667b4ce8863 100644 --- a/massa-protocol-exports-2/src/test_exports/config.rs +++ b/massa-protocol-exports-2/src/test_exports/config.rs @@ -23,7 +23,9 @@ impl Default for ProtocolConfig { max_node_known_endorsements_size: 1000, operation_batch_buffer_capacity: 1000, operation_announcement_buffer_capacity: 1000, + max_operation_storage_time: MassaTime::from_millis(60000), operation_batch_proc_period: 200.into(), + asked_operations_buffer_capacity: 10000, asked_operations_pruning_period: 500.into(), operation_announcement_interval: 150.into(), max_operations_per_message: 1024, diff --git a/massa-protocol-worker-2/src/controller.rs b/massa-protocol-worker-2/src/controller.rs index ce4dc03d2ca..5664d4f5942 100644 --- a/massa-protocol-worker-2/src/controller.rs +++ b/massa-protocol-worker-2/src/controller.rs @@ -72,6 +72,7 @@ impl ProtocolController for ProtocolControllerImpl { /// /// note: Full `OperationId` is replaced by a `OperationPrefixId` later by the worker. fn propagate_operations(&self, operations: Storage) -> Result<(), ProtocolError> { + //TODO: Change when send will be in propagation let operations = operations.get_op_refs().clone(); self.sender_operation_handler .send(OperationHandlerCommand::AnnounceOperations(operations)) diff --git a/massa-protocol-worker-2/src/handlers/block_handler/mod.rs b/massa-protocol-worker-2/src/handlers/block_handler/mod.rs index 9df2d96dce3..392f7bee80e 100644 --- a/massa-protocol-worker-2/src/handlers/block_handler/mod.rs +++ b/massa-protocol-worker-2/src/handlers/block_handler/mod.rs @@ -83,7 +83,7 @@ impl BlockHandler { }); let block_propagation_thread = std::thread::spawn({ - let _active_connections = active_connections.clone(); + let _active_connections = active_connections; move || { let _block_message_serializer = BlockMessageSerializer::new(); //TODO: Real logic diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs index 6535ad1befd..bc0b9249f3e 100644 --- a/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs @@ -30,6 +30,7 @@ pub fn start_propagation_thread( match internal_receiver.recv() { Ok(internal_message) => { match internal_message { + //TODO: Batch with timer 0 InternalMessage::PropagateEndorsements((from_peer_id, endorsements)) => { // Add endorsements received as known by the sender peer let cached_endorsements = propagation_thread diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs b/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs index 7ae1fb3432b..27f0998b3ba 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/cache.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, num::NonZeroUsize, sync::Arc}; +use std::{num::NonZeroUsize, sync::Arc}; use lru::LruCache; use massa_models::operation::{OperationId, OperationPrefixId}; @@ -8,7 +8,7 @@ use peernet::peer_id::PeerId; pub struct OperationCache { pub checked_operations: LruCache, pub checked_operations_prefix: LruCache, - pub ops_known_by_peer: LruCache>, + pub ops_known_by_peer: LruCache>, } impl OperationCache { diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/commands_propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/commands_propagation.rs index 1cdf62018f5..066ca2eb652 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/commands_propagation.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/commands_propagation.rs @@ -2,6 +2,6 @@ use massa_models::{operation::OperationId, prehash::PreHashSet}; #[derive(Clone)] pub enum OperationHandlerCommand { - /// (From peer id (optional, can come from API or other modules)), operations ids) + /// operations ids AnnounceOperations(PreHashSet), } diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs b/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs index d04c1b07c4d..78038af18ba 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs @@ -39,9 +39,10 @@ impl OperationHandler { let operation_retrieval_thread = start_retrieval_thread( receiver_network, pool_controller, - storage, + storage.clone_without_refs(), config.clone(), cache.clone(), + active_connections.clone(), local_sender, ); diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs index 043e0203c25..b5e6859103e 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs @@ -17,7 +17,6 @@ use super::{ struct PropagationThread { internal_receiver: Receiver, active_connections: SharedActiveConnections, - //TODO: Add pruning operations_to_announce: Vec, config: ProtocolConfig, cache: SharedOperationCache, @@ -101,7 +100,9 @@ impl PropagationThread { .copied() .collect(); if !new_ops.is_empty() { - ops.extend(new_ops.iter().map(|id| id.prefix())); + for id in &new_ops { + ops.put(id.prefix(), ()); + } { let mut active_connections = self.active_connections.write(); if let Some(connection) = active_connections.connections.get_mut(peer_id) { diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs index 7d225e0aae8..4afb2ba75e8 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs @@ -1,10 +1,15 @@ use std::{ - collections::{HashSet, VecDeque, BTreeSet}, + collections::{HashMap, VecDeque}, + num::NonZeroUsize, thread::JoinHandle, time::Instant, }; -use crossbeam::channel::{Receiver, RecvTimeoutError, Sender}; +use crossbeam::{ + channel::{tick, Receiver, Sender}, + select, +}; +use lru::LruCache; use massa_logging::massa_trace; use massa_models::{ operation::{OperationId, OperationPrefixId, OperationPrefixIds, SecureShareOperation}, @@ -18,15 +23,16 @@ use massa_protocol_exports_2::{ProtocolConfig, ProtocolError}; use massa_serialization::{DeserializeError, Deserializer}; use massa_storage::Storage; use massa_time::{MassaTime, TimeError}; -use peernet::peer_id::PeerId; +use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; -use crate::sig_verifier::verify_sigs_batch; +use crate::{messages::MessagesSerializer, sig_verifier::verify_sigs_batch}; use tracing::warn; use super::{ cache::SharedOperationCache, commands_propagation::OperationHandlerCommand, messages::{OperationMessage, OperationMessageDeserializer, OperationMessageDeserializerArgs}, + OperationMessageSerializer, }; /// Structure containing a Batch of `operation_ids` we would like to ask @@ -41,56 +47,18 @@ pub struct OperationBatchItem { pub operations_prefix_ids: OperationPrefixIds, } -enum RetrievalTimer { - NextAsk(Instant), - NextPrune(Instant), -} - -impl RetrievalTimer { - fn get_instant(&self) -> Instant { - match self { - RetrievalTimer::NextAsk(instant) => *instant, - RetrievalTimer::NextPrune(instant) => *instant, - } - } -} - -impl Eq for RetrievalTimer {} - -impl PartialEq for RetrievalTimer { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == std::cmp::Ordering::Equal - } -} - -impl PartialOrd for RetrievalTimer { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for RetrievalTimer { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - match (self, other) { - (RetrievalTimer::NextAsk(_), RetrievalTimer::NextAsk(_)) => std::cmp::Ordering::Equal, - (RetrievalTimer::NextPrune(_), RetrievalTimer::NextPrune(_)) => { - std::cmp::Ordering::Equal - } - (a, b) => a.get_instant().cmp(&b.get_instant()), - } - } -} - pub struct RetrievalThread { receiver: Receiver<(PeerId, u64, Vec)>, pool_controller: Box, cache: SharedOperationCache, - asked_operations: PreHashMap)>, + asked_operations: LruCache)>, + active_connections: SharedActiveConnections, op_batch_buffer: VecDeque, - timers: BTreeSet, + stored_operations: HashMap>, storage: Storage, config: ProtocolConfig, internal_sender: Sender, + operation_message_serializer: MessagesSerializer, } impl RetrievalThread { @@ -108,75 +76,67 @@ impl RetrievalThread { max_op_datastore_key_length: u8::MAX, max_op_datastore_value_length: u64::MAX, }); - self.timers.insert(RetrievalTimer::NextAsk(Instant::now() - .checked_add(self.config.operation_batch_proc_period.to_duration()) - .expect("Can't add duration operation_batch_proc_period"))); - self.timers.insert(RetrievalTimer::NextPrune(Instant::now().checked_add( - self.config.asked_operations_pruning_period.to_duration(), - ).expect("Can't add duration operation_retrieval_prune_period"))); + let tick_ask_operations = tick(self.config.operation_batch_proc_period.to_duration()); + let tick_clear_storage = tick(self.config.asked_operations_pruning_period.to_duration()); loop { - let timer = self.timers.first().expect("No timers left in operation retrieval"); - //If there is message in the channel it will receive them even if the deadline is in the past - match self.receiver.recv_deadline(timer.get_instant()) { - Ok((peer_id, message_id, message)) => { - operation_message_deserializer.set_message_id(message_id); - let (rest, message) = operation_message_deserializer - .deserialize::(&message) - .unwrap(); - if !rest.is_empty() { - println!("Error: message not fully consumed"); - return; - } - match message { - OperationMessage::Operations(ops) => { - if let Err(err) = self.note_operations_from_peer(ops, &peer_id) { - warn!("peer {} sent us critically incorrect operation, which may be an attack attempt by the remote peer or a loss of sync between us and the remote peer. Err = {}", peer_id, err); - //TODO: Ban - //let _ = self.ban_node(&peer_id).await; + select! { + recv(self.receiver) -> msg => { + match msg { + Ok((peer_id, message_id, message)) => { + operation_message_deserializer.set_message_id(message_id); + let (rest, message) = operation_message_deserializer + .deserialize::(&message) + .unwrap(); + if !rest.is_empty() { + println!("Error: message not fully consumed"); + return; } - } - OperationMessage::OperationsAnnouncement(announcement) => { - if let Err(err) = - self.on_operations_announcements_received(announcement, &peer_id) - { - warn!("error when processing announcement received from peer {}: Err = {}", peer_id, err); + match message { + OperationMessage::Operations(ops) => { + if let Err(err) = self.note_operations_from_peer(ops, &peer_id) { + warn!("peer {} sent us critically incorrect operation, which may be an attack attempt by the remote peer or a loss of sync between us and the remote peer. Err = {}", peer_id, err); + //TODO: Ban + //let _ = self.ban_node(&peer_id).await; + } + } + OperationMessage::OperationsAnnouncement(announcement) => { + if let Err(err) = + self.on_operations_announcements_received(announcement, &peer_id) + { + warn!("error when processing announcement received from peer {}: Err = {}", peer_id, err); + } + } + OperationMessage::AskForOperations(ask) => { + if let Err(err) = self.on_asked_operations_received(&peer_id, ask) { + warn!("error when processing asked operations received from peer {}: Err = {}", peer_id, err); + } + } } } - OperationMessage::AskForOperations(ask) => { - if let Err(err) = self.on_asked_operations_received(&peer_id, ask) { - warn!("error when processing asked operations received from peer {}: Err = {}", peer_id, err); - } + Err(err) => { + warn!("Error in receiver: {}", err); + return; } } + }, + recv(tick_ask_operations) -> _ => { + if let Err(err) = self.update_ask_operation() { + warn!("Error in update_ask_operation: {}", err); + }; + }, + recv(tick_clear_storage) -> _ => { + self.clear_storage(); } - Err(RecvTimeoutError::Timeout) => { - match timer { - RetrievalTimer::NextAsk(_) => { - self.timers.pop_first(); - if let Err(err) = self.update_ask_operation() { - warn!("Error in update_ask_operation: {}", err); - }; - let next_ask_operations = Instant::now() - .checked_add(self.config.operation_batch_proc_period.to_duration()) - .expect("Can't add duration operation_batch_proc_period"); - self.timers - .insert(RetrievalTimer::NextAsk(next_ask_operations)); - } - RetrievalTimer::NextPrune(_) => { - self.timers.pop_first(); - self.asked_operations.clear(); - let next_prune_operations = Instant::now() - .checked_add(self.config.asked_operations_pruning_period.to_duration()) - .expect("Can't add duration operation_prune_period"); - self.timers - .insert(RetrievalTimer::NextPrune(next_prune_operations)); - } - } - } - Err(RecvTimeoutError::Disconnected) => { - println!("Disconnected"); - return; - } + } + } + } + + fn clear_storage(&mut self) { + for (instant, operations) in self.stored_operations.iter() { + if instant.elapsed() > self.config.asked_operations_pruning_period.to_duration() { + self.storage.drop_operation_refs(operations); + } else { + break; } } } @@ -225,10 +185,18 @@ impl RetrievalThread { } // add to known ops - let known_ops = cache_write - .ops_known_by_peer - .get_or_insert_mut(source_peer_id.clone(), || HashSet::default()); - known_ops.extend(received_ids.iter().map(|id| id.prefix())); + let known_ops = + cache_write + .ops_known_by_peer + .get_or_insert_mut(source_peer_id.clone(), || { + LruCache::new( + NonZeroUsize::new(self.config.max_known_ops_size) + .expect("max_known_ops_size in config must be > 0"), + ) + }); + for id in received_ids { + known_ops.put(id.prefix(), ()); + } } if !new_operations.is_empty() { @@ -268,6 +236,9 @@ impl RetrievalThread { ops_to_propagate.drop_operation_refs(&operations_to_not_propagate); let to_announce: PreHashSet = ops_to_propagate.get_op_refs().iter().copied().collect(); + self.stored_operations + .insert(Instant::now(), to_announce.clone()); + self.storage.extend(ops_to_propagate); self.internal_sender .send(OperationHandlerCommand::AnnounceOperations(to_announce)) .map_err(|err| ProtocolError::SendError(err.to_string()))?; @@ -306,10 +277,18 @@ impl RetrievalThread { // mark sender as knowing the ops { let mut cache_write = self.cache.write(); - let known_ops = cache_write - .ops_known_by_peer - .get_or_insert_mut(peer_id.clone(), || HashSet::default()); - known_ops.extend(op_batch.iter().copied()); + let known_ops = + cache_write + .ops_known_by_peer + .get_or_insert_mut(peer_id.clone(), || { + LruCache::new( + NonZeroUsize::new(self.config.max_known_ops_size) + .expect("max_known_ops_size in config must be > 0"), + ) + }); + for prefix in &op_batch { + known_ops.put(*prefix, ()); + } } // filter out the operations that we already know about @@ -326,7 +305,7 @@ impl RetrievalThread { for op_id in op_batch { let wish = match self.asked_operations.get_mut(&op_id) { Some(wish) => { - if wish.1.contains(&peer_id) { + if wish.1.contains(peer_id) { continue; // already asked to the `peer_id` } else { Some(wish) // already asked but at someone else @@ -352,7 +331,7 @@ impl RetrievalThread { } else { ask_set.insert(op_id); self.asked_operations - .insert(op_id, (now, vec![peer_id.clone()])); + .put(op_id, (now, vec![peer_id.clone()])); } } // EndOf for op_id in op_batch: @@ -370,17 +349,28 @@ impl RetrievalThread { operations_prefix_ids: future_set, }); } - - //TODO: Wait damir's answer on Discord - // if !ask_set.is_empty() { - // self.network_command_sender - // .send_ask_for_operations(peer_id, ask_set) - // .await - // .map_err(|_| ProtocolError::ChannelError("send ask for operations failed".into())) - // } else { - // Ok(()) - // } - Ok(()) + if !ask_set.is_empty() { + { + let mut active_connections_write = self.active_connections.write(); + if let Some(peer) = active_connections_write.connections.get_mut(peer_id) { + peer.send_channels + .send( + &self.operation_message_serializer, + OperationMessage::AskForOperations(ask_set).into(), + false, + ) + .map_err(|err| ProtocolError::SendError(err.to_string())) + } else { + { + let mut cache_write = self.cache.write(); + cache_write.ops_known_by_peer.pop(peer_id); + } + Ok(()) + } + } + } else { + Ok(()) + } } fn update_ask_operation(&mut self) -> Result<(), ProtocolError> { @@ -398,12 +388,13 @@ impl RetrievalThread { Ok(()) } + /// Maybe move this to propagation /// Process the reception of a batch of asked operations, that means that /// we have already sent a batch of ids in the network, notifying that we already /// have those operations. fn on_asked_operations_received( &mut self, - _peer_id: &PeerId, + peer_id: &PeerId, op_pre_ids: OperationPrefixIds, ) -> Result<(), ProtocolError> { if op_pre_ids.is_empty() { @@ -427,13 +418,24 @@ impl RetrievalThread { } } } - //TODO: See with damir - // if !ops.is_empty() { - // self.network_command_sender - // .send_operations(node_id, ops) - // .await?; - // } - Ok(()) + { + let mut active_connections_write = self.active_connections.write(); + if let Some(peer) = active_connections_write.connections.get_mut(peer_id) { + peer.send_channels + .send( + &self.operation_message_serializer, + OperationMessage::Operations(ops).into(), + false, + ) + .map_err(|err| ProtocolError::SendError(err.to_string())) + } else { + { + let mut cache_write = self.cache.write(); + cache_write.ops_known_by_peer.pop(peer_id); + } + Ok(()) + } + } } } @@ -443,19 +445,26 @@ pub fn start_retrieval_thread( storage: Storage, config: ProtocolConfig, cache: SharedOperationCache, + active_connections: SharedActiveConnections, internal_sender: Sender, ) -> JoinHandle<()> { std::thread::spawn(move || { let mut retrieval_thread = RetrievalThread { receiver, pool_controller, + stored_operations: HashMap::new(), storage, internal_sender, cache, + active_connections, + asked_operations: LruCache::new( + NonZeroUsize::new(config.asked_operations_buffer_capacity) + .expect("asked_operations_buffer_capacity in config must be > 0"), + ), config, - asked_operations: PreHashMap::default(), + operation_message_serializer: MessagesSerializer::new() + .with_operation_message_serializer(OperationMessageSerializer::new()), op_batch_buffer: VecDeque::new(), - timers: BTreeSet::default() }; retrieval_thread.run(); }) diff --git a/massa-protocol-worker-2/src/messages.rs b/massa-protocol-worker-2/src/messages.rs index 0fee086620a..2dfaec3d4f0 100644 --- a/massa-protocol-worker-2/src/messages.rs +++ b/massa-protocol-worker-2/src/messages.rs @@ -18,34 +18,34 @@ use crate::handlers::{ }; pub enum Message { - BlockMessage(BlockMessage), - EndorsementMessage(EndorsementMessage), - OperationMessage(OperationMessage), - PeerManagementMessage(PeerManagementMessage), + Block(BlockMessage), + Endorsement(EndorsementMessage), + Operation(OperationMessage), + PeerManagement(PeerManagementMessage), } //TODO: Macroize this impl From for Message { fn from(message: BlockMessage) -> Self { - Self::BlockMessage(message) + Self::Block(message) } } impl From for Message { fn from(message: EndorsementMessage) -> Self { - Self::EndorsementMessage(message) + Self::Endorsement(message) } } impl From for Message { fn from(message: OperationMessage) -> Self { - Self::OperationMessage(message) + Self::Operation(message) } } impl From for Message { fn from(message: PeerManagementMessage) -> Self { - Self::PeerManagementMessage(message) + Self::PeerManagement(message) } } @@ -53,14 +53,14 @@ impl Message { //TODO: Macroize get_id and max_id fn get_id(&self) -> u64 { match self { - Message::BlockMessage(message) => message.get_id() as u64, - Message::EndorsementMessage(message) => { + Message::Block(message) => message.get_id() as u64, + Message::Endorsement(message) => { message.get_id() as u64 + BlockMessage::max_id() } - Message::OperationMessage(message) => { + Message::Operation(message) => { message.get_id() as u64 + BlockMessage::max_id() + EndorsementMessage::max_id() } - Message::PeerManagementMessage(message) => { + Message::PeerManagement(message) => { message.get_id() as u64 + BlockMessage::max_id() + EndorsementMessage::max_id() @@ -137,7 +137,7 @@ impl PeerNetMessagesSerializer for MessagesSerializer { /// Serialize the message fn serialize(&self, message: &Message, buffer: &mut Vec) -> PeerNetResult<()> { match message { - Message::BlockMessage(message) => { + Message::Block(message) => { if let Some(serializer) = &self.block_message_serializer { serializer.serialize(message, buffer).map_err(|err| { PeerNetError::HandlerError.error( @@ -152,7 +152,7 @@ impl PeerNetMessagesSerializer for MessagesSerializer { )) } } - Message::EndorsementMessage(message) => { + Message::Endorsement(message) => { if let Some(serializer) = &self.endorsement_message_serializer { serializer.serialize(message, buffer).map_err(|err| { PeerNetError::HandlerError.error( @@ -167,7 +167,7 @@ impl PeerNetMessagesSerializer for MessagesSerializer { )) } } - Message::OperationMessage(message) => { + Message::Operation(message) => { if let Some(serializer) = &self.operation_message_serializer { serializer.serialize(message, buffer).map_err(|err| { PeerNetError::HandlerError.error( @@ -182,7 +182,7 @@ impl PeerNetMessagesSerializer for MessagesSerializer { )) } } - Message::PeerManagementMessage(message) => { + Message::PeerManagement(message) => { if let Some(serializer) = &self.peer_management_message_serializer { serializer.serialize(message, buffer).map_err(|err| { PeerNetError::HandlerError.error( diff --git a/massa-protocol-worker-2/src/worker.rs b/massa-protocol-worker-2/src/worker.rs index fb7c21d4871..7992cba910f 100644 --- a/massa-protocol-worker-2/src/worker.rs +++ b/massa-protocol-worker-2/src/worker.rs @@ -23,7 +23,7 @@ pub fn start_protocol_controller( debug!("starting protocol controller"); let (connectivity_thread_handle, controller) = - start_connectivity_thread(config.clone(), pool_controller, storage)?; + start_connectivity_thread(config, pool_controller, storage)?; let manager = ProtocolManagerImpl::new(connectivity_thread_handle); From 7c13e4d86415f9d930dcdbb4de06b515422c6d1b Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 14 Apr 2023 09:48:23 +0200 Subject: [PATCH 08/11] Remove useless line and add a comment --- massa-protocol-worker-2/src/handlers/block_handler/mod.rs | 1 - massa-protocol-worker-2/src/sig_verifier.rs | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/massa-protocol-worker-2/src/handlers/block_handler/mod.rs b/massa-protocol-worker-2/src/handlers/block_handler/mod.rs index 392f7bee80e..70780dcec4a 100644 --- a/massa-protocol-worker-2/src/handlers/block_handler/mod.rs +++ b/massa-protocol-worker-2/src/handlers/block_handler/mod.rs @@ -83,7 +83,6 @@ impl BlockHandler { }); let block_propagation_thread = std::thread::spawn({ - let _active_connections = active_connections; move || { let _block_message_serializer = BlockMessageSerializer::new(); //TODO: Real logic diff --git a/massa-protocol-worker-2/src/sig_verifier.rs b/massa-protocol-worker-2/src/sig_verifier.rs index 1ed1c470209..cd0bc7f459e 100644 --- a/massa-protocol-worker-2/src/sig_verifier.rs +++ b/massa-protocol-worker-2/src/sig_verifier.rs @@ -7,6 +7,7 @@ use massa_protocol_exports_2::ProtocolError; use massa_signature::{verify_signature_batch, PublicKey, Signature}; use rayon::{prelude::ParallelIterator, slice::ParallelSlice}; +//TODO: Benchmark /// Limit for small batch optimization const SMALL_BATCH_LIMIT: usize = 2; From 88f5e25a6517cdf52a7ea2542a53f28cb94011f4 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 14 Apr 2023 10:24:49 +0200 Subject: [PATCH 09/11] Remove useless write locks. --- .../src/handlers/operation_handler/mod.rs | 6 ++++-- .../src/handlers/operation_handler/propagation.rs | 14 ++++++-------- .../src/handlers/operation_handler/retrieval.rs | 15 +++++++++------ 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs b/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs index 78038af18ba..58e4f86a108 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/mod.rs @@ -4,7 +4,7 @@ use crossbeam::channel::{Receiver, Sender}; use massa_pool_exports::PoolController; use massa_protocol_exports_2::ProtocolConfig; use massa_storage::Storage; -use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; +use peernet::network_manager::SharedActiveConnections; use self::{ cache::SharedOperationCache, commands_propagation::OperationHandlerCommand, @@ -19,6 +19,8 @@ mod retrieval; pub(crate) use messages::{OperationMessage, OperationMessageSerializer}; +use super::peer_handler::models::PeerMessageTuple; + pub struct OperationHandler { pub operation_retrieval_thread: Option>, pub operation_propagation_thread: Option>, @@ -31,7 +33,7 @@ impl OperationHandler { config: ProtocolConfig, cache: SharedOperationCache, active_connections: SharedActiveConnections, - receiver_network: Receiver<(PeerId, u64, Vec)>, + receiver_network: Receiver, local_sender: Sender, local_receiver: Receiver, ) -> Self { diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs index b5e6859103e..359307940e5 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs @@ -82,14 +82,12 @@ impl PropagationThread { .map(|(id, _)| id.clone()) .collect(); // Clean shared cache if peers do not exist anymore - for peer_id in peers { - if !self - .active_connections - .read() - .connections - .contains_key(&peer_id) - { - cache_write.ops_known_by_peer.pop(&peer_id); + { + let active_connections_read = self.active_connections.read(); + for peer_id in peers { + if !active_connections_read.connections.contains_key(&peer_id) { + cache_write.ops_known_by_peer.pop(&peer_id); + } } } // Propagate to peers diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs index 4afb2ba75e8..87351a79d16 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/retrieval.rs @@ -25,7 +25,10 @@ use massa_storage::Storage; use massa_time::{MassaTime, TimeError}; use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; -use crate::{messages::MessagesSerializer, sig_verifier::verify_sigs_batch}; +use crate::{ + handlers::peer_handler::models::PeerMessageTuple, messages::MessagesSerializer, + sig_verifier::verify_sigs_batch, +}; use tracing::warn; use super::{ @@ -48,7 +51,7 @@ pub struct OperationBatchItem { } pub struct RetrievalThread { - receiver: Receiver<(PeerId, u64, Vec)>, + receiver: Receiver, pool_controller: Box, cache: SharedOperationCache, asked_operations: LruCache)>, @@ -351,8 +354,8 @@ impl RetrievalThread { } if !ask_set.is_empty() { { - let mut active_connections_write = self.active_connections.write(); - if let Some(peer) = active_connections_write.connections.get_mut(peer_id) { + let active_connections_read = self.active_connections.read(); + if let Some(peer) = active_connections_read.connections.get(peer_id) { peer.send_channels .send( &self.operation_message_serializer, @@ -419,8 +422,8 @@ impl RetrievalThread { } } { - let mut active_connections_write = self.active_connections.write(); - if let Some(peer) = active_connections_write.connections.get_mut(peer_id) { + let active_connections_read = self.active_connections.read(); + if let Some(peer) = active_connections_read.connections.get(peer_id) { peer.send_channels .send( &self.operation_message_serializer, From 8c5ad2a2a0f396291520d1c32073395024dd6a2b Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Fri, 14 Apr 2023 14:39:54 +0200 Subject: [PATCH 10/11] Add basic code for endorsement handler --- massa-protocol-worker-2/src/connectivity.rs | 10 +- massa-protocol-worker-2/src/controller.rs | 2 +- .../src/handlers/endorsement_handler/cache.rs | 29 +++ .../{commands.rs => commands_propagation.rs} | 4 +- .../endorsement_handler/internal_messages.rs | 7 - .../src/handlers/endorsement_handler/mod.rs | 33 ++-- .../endorsement_handler/propagation.rs | 187 ++++++++++++------ .../handlers/endorsement_handler/retrieval.rs | 136 ++++++------- .../handlers/operation_handler/propagation.rs | 16 +- 9 files changed, 275 insertions(+), 149 deletions(-) create mode 100644 massa-protocol-worker-2/src/handlers/endorsement_handler/cache.rs rename massa-protocol-worker-2/src/handlers/endorsement_handler/{commands.rs => commands_propagation.rs} (50%) delete mode 100644 massa-protocol-worker-2/src/handlers/endorsement_handler/internal_messages.rs diff --git a/massa-protocol-worker-2/src/connectivity.rs b/massa-protocol-worker-2/src/connectivity.rs index 1152ea50648..6dcc90558a2 100644 --- a/massa-protocol-worker-2/src/connectivity.rs +++ b/massa-protocol-worker-2/src/connectivity.rs @@ -20,7 +20,7 @@ use crate::{ controller::ProtocolControllerImpl, handlers::{ block_handler::BlockHandler, - endorsement_handler::EndorsementHandler, + endorsement_handler::{cache::EndorsementCache, EndorsementHandler}, operation_handler::{cache::OperationCache, OperationHandler}, peer_handler::{fallback_function, MassaHandshake, PeerManagementHandler}, }, @@ -53,6 +53,7 @@ pub fn start_connectivity_thread( let (sender_blocks_ext, receiver_blocks_ext) = unbounded(); let handle = std::thread::spawn({ + let sender_endorsements_ext = sender_endorsements_ext.clone(); let sender_operations_ext = sender_operations_ext.clone(); move || { let mut peer_management_handler = PeerManagementHandler::new(initial_peers); @@ -89,6 +90,10 @@ pub fn start_connectivity_thread( NonZeroUsize::new(usize::MAX).unwrap(), NonZeroUsize::new(usize::MAX).unwrap(), ))); + let endorsement_cache = Arc::new(RwLock::new(EndorsementCache::new( + NonZeroUsize::new(usize::MAX).unwrap(), + NonZeroUsize::new(usize::MAX).unwrap(), + ))); // Start handlers let mut operation_handler = OperationHandler::new( @@ -103,9 +108,12 @@ pub fn start_connectivity_thread( ); let mut endorsement_handler = EndorsementHandler::new( pool_controller, + endorsement_cache, storage, + config.clone(), manager.active_connections.clone(), receiver_endorsements, + sender_endorsements_ext, receiver_endorsements_ext, ); let mut block_handler = BlockHandler::new( diff --git a/massa-protocol-worker-2/src/controller.rs b/massa-protocol-worker-2/src/controller.rs index 5664d4f5942..5b1fba6d169 100644 --- a/massa-protocol-worker-2/src/controller.rs +++ b/massa-protocol-worker-2/src/controller.rs @@ -9,7 +9,7 @@ use massa_storage::Storage; use crate::handlers::{ block_handler::commands::BlockHandlerCommand, - endorsement_handler::commands::EndorsementHandlerCommand, + endorsement_handler::commands_propagation::EndorsementHandlerCommand, operation_handler::commands_propagation::OperationHandlerCommand, }; diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/cache.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/cache.rs new file mode 100644 index 00000000000..7b2f110f2ef --- /dev/null +++ b/massa-protocol-worker-2/src/handlers/endorsement_handler/cache.rs @@ -0,0 +1,29 @@ +use std::{num::NonZeroUsize, sync::Arc}; + +use lru::LruCache; +use massa_models::endorsement::EndorsementId; +use parking_lot::RwLock; +use peernet::peer_id::PeerId; + +pub struct EndorsementCache { + pub checked_endorsements: LruCache, + pub endorsements_known_by_peer: LruCache>, +} + +impl EndorsementCache { + pub fn new( + max_known_endorsements: NonZeroUsize, + max_known_endorsements_by_peer: NonZeroUsize, + ) -> Self { + Self { + checked_endorsements: LruCache::new(max_known_endorsements), + endorsements_known_by_peer: LruCache::new(max_known_endorsements_by_peer), + } + } + + pub fn insert_checked_endorsement(&mut self, endorsement_id: EndorsementId) { + self.checked_endorsements.put(endorsement_id, ()); + } +} + +pub type SharedEndorsementCache = Arc>; diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/commands.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/commands_propagation.rs similarity index 50% rename from massa-protocol-worker-2/src/handlers/endorsement_handler/commands.rs rename to massa-protocol-worker-2/src/handlers/endorsement_handler/commands_propagation.rs index 53abc8d4e60..d43d6e67575 100644 --- a/massa-protocol-worker-2/src/handlers/endorsement_handler/commands.rs +++ b/massa-protocol-worker-2/src/handlers/endorsement_handler/commands_propagation.rs @@ -1,8 +1,6 @@ use massa_storage::Storage; -/// Commands that the endorsement handler can process -#[derive(Debug)] pub enum EndorsementHandlerCommand { - /// Propagate endorsements + // Storage that contains endorsements to propagate PropagateEndorsements(Storage), } diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/internal_messages.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/internal_messages.rs deleted file mode 100644 index 961eddc1bc7..00000000000 --- a/massa-protocol-worker-2/src/handlers/endorsement_handler/internal_messages.rs +++ /dev/null @@ -1,7 +0,0 @@ -use massa_models::endorsement::SecureShareEndorsement; -use peernet::peer_id::PeerId; - -pub enum InternalMessage { - /// (From peer id, endorsements) - PropagateEndorsements((PeerId, Vec)), -} diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/mod.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/mod.rs index ddab3ba6ba5..c08d7e4750c 100644 --- a/massa-protocol-worker-2/src/handlers/endorsement_handler/mod.rs +++ b/massa-protocol-worker-2/src/handlers/endorsement_handler/mod.rs @@ -1,23 +1,26 @@ use std::thread::JoinHandle; -use crossbeam::channel::{unbounded, Receiver}; +use crossbeam::channel::{Receiver, Sender}; use massa_pool_exports::PoolController; +use massa_protocol_exports_2::ProtocolConfig; use massa_storage::Storage; -use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; +use peernet::network_manager::SharedActiveConnections; use self::{ - commands::EndorsementHandlerCommand, propagation::start_propagation_thread, - retrieval::start_retrieval_thread, + cache::SharedEndorsementCache, commands_propagation::EndorsementHandlerCommand, + propagation::start_propagation_thread, retrieval::start_retrieval_thread, }; -pub mod commands; -mod internal_messages; +pub mod cache; +pub mod commands_propagation; mod messages; mod propagation; mod retrieval; pub(crate) use messages::{EndorsementMessage, EndorsementMessageSerializer}; +use super::peer_handler::models::PeerMessageTuple; + pub struct EndorsementHandler { pub endorsement_retrieval_thread: Option>, pub endorsement_propagation_thread: Option>, @@ -26,23 +29,25 @@ pub struct EndorsementHandler { impl EndorsementHandler { pub fn new( pool_controller: Box, + cache: SharedEndorsementCache, storage: Storage, + config: ProtocolConfig, active_connections: SharedActiveConnections, - receiver: Receiver<(PeerId, u64, Vec)>, - receiver_ext: Receiver, + receiver: Receiver, + local_sender: Sender, + local_receiver: Receiver, ) -> Self { - //TODO: Define bound channel - let (internal_sender, internal_receiver) = unbounded(); let endorsement_retrieval_thread = start_retrieval_thread( receiver, - receiver_ext, + local_sender, + cache.clone(), pool_controller, - storage, - internal_sender, + config.clone(), + storage.clone_without_refs(), ); let endorsement_propagation_thread = - start_propagation_thread(internal_receiver, active_connections); + start_propagation_thread(local_receiver, cache, config, active_connections); Self { endorsement_retrieval_thread: Some(endorsement_retrieval_thread), endorsement_propagation_thread: Some(endorsement_propagation_thread), diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs index bc0b9249f3e..273c7ac3d06 100644 --- a/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs @@ -1,80 +1,155 @@ -use std::{collections::HashMap, thread::JoinHandle}; +use std::{num::NonZeroUsize, thread::JoinHandle}; use crossbeam::channel::Receiver; -use massa_models::{endorsement::EndorsementId, prehash::PreHashSet}; +use lru::LruCache; +use massa_models::{ + endorsement::{EndorsementId, SecureShareEndorsement}, + prehash::{PreHashMap, PreHashSet}, +}; +use massa_protocol_exports_2::ProtocolConfig; use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId}; +use tracing::log::warn; -use crate::{ - handlers::endorsement_handler::messages::EndorsementMessage, messages::MessagesSerializer, -}; +use crate::messages::MessagesSerializer; -use super::{internal_messages::InternalMessage, messages::EndorsementMessageSerializer}; +use super::{ + cache::SharedEndorsementCache, commands_propagation::EndorsementHandlerCommand, + messages::EndorsementMessageSerializer, EndorsementMessage, +}; struct PropagationThread { - //TODO: Add pruning - cache_by_peer: HashMap>, + receiver: Receiver, + config: ProtocolConfig, + cache: SharedEndorsementCache, + active_connections: SharedActiveConnections, + endorsement_serializer: MessagesSerializer, } -pub fn start_propagation_thread( - internal_receiver: Receiver, - active_connections: SharedActiveConnections, -) -> JoinHandle<()> { - //TODO: Here and everywhere add id to threads - std::thread::spawn(move || { - let endorsement_serializer = MessagesSerializer::new() - .with_endorsement_message_serializer(EndorsementMessageSerializer::new()); - let mut propagation_thread = PropagationThread { - cache_by_peer: HashMap::new(), - }; +impl PropagationThread { + fn run(&mut self) { loop { - match internal_receiver.recv() { - Ok(internal_message) => { - match internal_message { - //TODO: Batch with timer 0 - InternalMessage::PropagateEndorsements((from_peer_id, endorsements)) => { - // Add endorsements received as known by the sender peer - let cached_endorsements = propagation_thread - .cache_by_peer - .entry(from_peer_id) - .or_insert(PreHashSet::default()); - cached_endorsements - .extend(endorsements.iter().map(|endorsement| endorsement.id)); - - // Send the endorsements to all connected peers - let active_connections = active_connections.read(); - for (peer_id, connection) in active_connections.connections.iter() { - // Filter endorsements already known by the peer - let mut endorsements = endorsements.clone(); - if let Some(cached_endorsements) = - propagation_thread.cache_by_peer.get_mut(peer_id) + match self.receiver.recv() { + Ok(msg) => { + match msg { + EndorsementHandlerCommand::PropagateEndorsements(mut endorsements) => { + // IMPORTANT: This is there to batch all "waiting to propagate endorsements" but will not work anymore if there is + // other variants in EndorsementHandlerCommand + while let Ok(msg) = self.receiver.try_recv() { + match msg { + EndorsementHandlerCommand::PropagateEndorsements( + endorsements2, + ) => { + endorsements.extend(endorsements2); + } + } + } + let endorsements_ids: PreHashSet = endorsements + .get_endorsement_refs() + .iter() + .copied() + .collect(); + { + let mut cache_write = self.cache.write(); + for endorsement_id in endorsements_ids.iter().copied() { + cache_write.insert_checked_endorsement(endorsement_id); + } + // Add peers that potentially don't exist in cache { - endorsements.retain(|endorsement| { - if cached_endorsements.contains(&endorsement.id) { - false - } else { - cached_endorsements.insert(endorsement.id); - true + let active_connections_read = self.active_connections.read(); + for peer_id in active_connections_read.connections.keys() { + cache_write.endorsements_known_by_peer.put(peer_id.clone(), LruCache::new(NonZeroUsize::new(self.config.max_node_known_endorsements_size).expect("max_node_known_endorsements_size in config is > 0"))); + } + } + let peers: Vec = cache_write + .endorsements_known_by_peer + .iter() + .map(|(id, _)| id.clone()) + .collect(); + // Clean shared cache if peers do not exist anymore + { + let active_connections_read = self.active_connections.read(); + for peer_id in peers { + if !active_connections_read + .connections + .contains_key(&peer_id) + { + cache_write.endorsements_known_by_peer.pop(&peer_id); } - }); + } + } + for (peer_id, endorsement_ids) in + cache_write.endorsements_known_by_peer.iter_mut() + { + let new_endorsements: PreHashMap< + EndorsementId, + SecureShareEndorsement, + > = { + let endorsements_reader = endorsements.read_endorsements(); + endorsements + .get_endorsement_refs() + .iter() + .filter_map(|id| { + if endorsement_ids.contains(id) { + return None; + } + Some(( + *id, + endorsements_reader.get(id).cloned().unwrap(), + )) + }) + .collect() + }; + for endorsement_id in new_endorsements.keys().copied() { + endorsement_ids.put(endorsement_id, ()); + } + let to_send = + new_endorsements.into_values().collect::>(); + if !to_send.is_empty() { + let active_connections_read = + self.active_connections.read(); + if let Some(peer) = + active_connections_read.connections.get(peer_id) + { + if let Err(err) = peer.send_channels.send( + &self.endorsement_serializer, + EndorsementMessage::Endorsements(to_send).into(), + false, + ) { + warn!("could not send endorsements batch to node {}: {}", peer_id, err); + } + } + } } - - // Send the endorsements - let message = EndorsementMessage::Endorsements(endorsements); - println!("Sending message to {:?}", peer_id); - // TODO: Error management - connection - .send_channels - .send(&endorsement_serializer, message.into(), false) - .unwrap(); } } } } Err(err) => { - println!("Error: {:?}", err); + println!("Err: {:#?}", err); return; } } } + } +} + +pub fn start_propagation_thread( + receiver: Receiver, + cache: SharedEndorsementCache, + config: ProtocolConfig, + active_connections: SharedActiveConnections, +) -> JoinHandle<()> { + //TODO: Here and everywhere add id to threads + std::thread::spawn(move || { + let endorsement_serializer = MessagesSerializer::new() + .with_endorsement_message_serializer(EndorsementMessageSerializer::new()); + let mut propagation_thread = PropagationThread { + receiver, + config, + active_connections, + cache, + endorsement_serializer, + }; + propagation_thread.run(); }) } diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/retrieval.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/retrieval.rs index 309c99609d6..210be2cb71d 100644 --- a/massa-protocol-worker-2/src/handlers/endorsement_handler/retrieval.rs +++ b/massa-protocol-worker-2/src/handlers/endorsement_handler/retrieval.rs @@ -1,30 +1,30 @@ -use std::thread::JoinHandle; +use std::{num::NonZeroUsize, thread::JoinHandle}; -use crossbeam::{ - channel::{Receiver, Sender}, - select, -}; +use crossbeam::channel::{Receiver, Sender}; +use lru::LruCache; use massa_models::{endorsement::EndorsementId, prehash::PreHashSet}; use massa_pool_exports::PoolController; +use massa_protocol_exports_2::ProtocolConfig; use massa_serialization::{DeserializeError, Deserializer}; use massa_storage::Storage; -use peernet::peer_id::PeerId; -use crate::handlers::endorsement_handler::messages::EndorsementMessage; +use crate::handlers::{ + endorsement_handler::messages::EndorsementMessage, peer_handler::models::PeerMessageTuple, +}; use super::{ - commands::EndorsementHandlerCommand, - internal_messages::InternalMessage, + cache::SharedEndorsementCache, + commands_propagation::EndorsementHandlerCommand, messages::{EndorsementMessageDeserializer, EndorsementMessageDeserializerArgs}, }; pub struct RetrievalThread { - receiver: Receiver<(PeerId, u64, Vec)>, - receiver_ext: Receiver, - cached_endorsement_ids: PreHashSet, + receiver: Receiver, + cache: SharedEndorsementCache, + internal_sender: Sender, pool_controller: Box, + config: ProtocolConfig, storage: Storage, - internal_sender: Sender, } impl RetrievalThread { @@ -37,78 +37,82 @@ impl RetrievalThread { endorsement_count: 32, }); loop { - select! { - recv(self.receiver) -> msg => { - match msg { - Ok((peer_id, message_id, message)) => { - endorsement_message_deserializer.set_message_id(message_id); - let (rest, message) = endorsement_message_deserializer - .deserialize::(&message) - .unwrap(); - if !rest.is_empty() { - println!("Error: message not fully consumed"); - return; - } - match message { - EndorsementMessage::Endorsements(mut endorsements) => { - // Retain endorsements not in cache in endorsement vec and add them - endorsements.retain(|endorsement| { - if self.cached_endorsement_ids.contains(&endorsement.id) { - false - } else { - self.cached_endorsement_ids.insert(endorsement.id); - true - } - }); - let mut endorsements_storage = self.storage.clone_without_refs(); - endorsements_storage.store_endorsements(endorsements.clone()); - self.pool_controller.add_endorsements(endorsements_storage); - self.internal_sender - .send(InternalMessage::PropagateEndorsements(( - peer_id, - endorsements, - ))) - .unwrap(); + match self.receiver.recv() { + Ok((peer_id, message_id, message)) => { + endorsement_message_deserializer.set_message_id(message_id); + let (rest, message) = endorsement_message_deserializer + .deserialize::(&message) + .unwrap(); + if !rest.is_empty() { + println!("Error: message not fully consumed"); + return; + } + match message { + EndorsementMessage::Endorsements(mut endorsements) => { + // Retain endorsements not in cache in endorsement vec and add them + { + let ids: PreHashSet = endorsements + .iter() + .map(|endorsement| endorsement.id) + .collect(); + let mut cache_write = self.cache.write(); + // Add endorsements known for this peer + let cached_endorsements = cache_write + .endorsements_known_by_peer + .get_or_insert_mut(peer_id, || LruCache::new(NonZeroUsize::new(self.config.max_node_known_endorsements_size).expect("max_node_known_endorsements_size in config should be > 0"))); + for id in ids { + cached_endorsements.put(id, ()); } + endorsements.retain(|endorsement| { + if cache_write + .checked_endorsements + .get(&endorsement.id) + .is_some() + { + false + } else { + cache_write.insert_checked_endorsement(endorsement.id); + true + } + }); } - } - Err(err) => { - println!("Error: {:?}", err); - return; - } - } - }, - recv(self.receiver_ext) -> command => { - match command { - Ok(command) => { - println!("Received command: {:?}", command); - } - Err(err) => { - println!("Error: {:?}", err); - return; + let mut endorsements_storage = self.storage.clone_without_refs(); + endorsements_storage.store_endorsements(endorsements.clone()); + self.pool_controller + .add_endorsements(endorsements_storage.clone()); + self.internal_sender + .send(EndorsementHandlerCommand::PropagateEndorsements( + endorsements_storage, + )) + .unwrap(); } } } + Err(err) => { + println!("Error: {:?}", err); + return; + } } } } } pub fn start_retrieval_thread( - receiver: Receiver<(PeerId, u64, Vec)>, - receiver_ext: Receiver, + receiver: Receiver, + internal_sender: Sender, + cache: SharedEndorsementCache, pool_controller: Box, + config: ProtocolConfig, storage: Storage, - internal_sender: Sender, ) -> JoinHandle<()> { std::thread::spawn(move || { let mut retrieval_thread = RetrievalThread { receiver, - receiver_ext, - cached_endorsement_ids: PreHashSet::default(), + cache, + internal_sender, pool_controller, + config, storage, - internal_sender, }; retrieval_thread.run(); }) diff --git a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs index 359307940e5..fcdc384e8ac 100644 --- a/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/operation_handler/propagation.rs @@ -1,6 +1,7 @@ -use std::{mem, thread::JoinHandle}; +use std::{mem, num::NonZeroUsize, thread::JoinHandle}; use crossbeam::channel::{Receiver, RecvTimeoutError}; +use lru::LruCache; use massa_logging::massa_trace; use massa_models::operation::OperationId; use massa_protocol_exports_2::ProtocolConfig; @@ -90,6 +91,19 @@ impl PropagationThread { } } } + // Add new potential peers + { + let active_connections_read = self.active_connections.read(); + for peer_id in active_connections_read.connections.keys() { + cache_write.ops_known_by_peer.put( + peer_id.clone(), + LruCache::new( + NonZeroUsize::new(self.config.max_node_known_ops_size) + .expect("max_node_known_endorsements_size in config is > 0"), + ), + ); + } + } // Propagate to peers for (peer_id, ops) in cache_write.ops_known_by_peer.iter_mut() { let new_ops: Vec = operation_ids From d7c5fbc66c7c98d38636924272d760a9710a2d43 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 17 Apr 2023 09:50:09 +0200 Subject: [PATCH 11/11] Remove unwrap and change print to warn. --- .../src/handlers/endorsement_handler/propagation.rs | 5 ++++- .../src/handlers/endorsement_handler/retrieval.rs | 11 +++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs index 273c7ac3d06..94b8d91bbfa 100644 --- a/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs +++ b/massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs @@ -125,7 +125,10 @@ impl PropagationThread { } } Err(err) => { - println!("Err: {:#?}", err); + warn!( + "Error in propagation thread of endorsement handler: {:#?}", + err + ); return; } } diff --git a/massa-protocol-worker-2/src/handlers/endorsement_handler/retrieval.rs b/massa-protocol-worker-2/src/handlers/endorsement_handler/retrieval.rs index 210be2cb71d..c64c7fe5336 100644 --- a/massa-protocol-worker-2/src/handlers/endorsement_handler/retrieval.rs +++ b/massa-protocol-worker-2/src/handlers/endorsement_handler/retrieval.rs @@ -7,6 +7,7 @@ use massa_pool_exports::PoolController; use massa_protocol_exports_2::ProtocolConfig; use massa_serialization::{DeserializeError, Deserializer}; use massa_storage::Storage; +use tracing::warn; use crate::handlers::{ endorsement_handler::messages::EndorsementMessage, peer_handler::models::PeerMessageTuple, @@ -80,11 +81,13 @@ impl RetrievalThread { endorsements_storage.store_endorsements(endorsements.clone()); self.pool_controller .add_endorsements(endorsements_storage.clone()); - self.internal_sender - .send(EndorsementHandlerCommand::PropagateEndorsements( + if let Err(err) = self.internal_sender.send( + EndorsementHandlerCommand::PropagateEndorsements( endorsements_storage, - )) - .unwrap(); + ), + ) { + warn!("Failed to send from retrieval thread of endorsement handler to propagation: {:?}", err); + } } } }