Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic endorsement handler code #3827

Merged
merged 13 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion massa-protocol-worker-2/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
controller::ProtocolControllerImpl,
handlers::{
block_handler::BlockHandler,
endorsement_handler::EndorsementHandler,
endorsement_handler::{cache::EndorsementCache, EndorsementHandler},
operation_handler::{cache::OperationCache, OperationHandler},
peer_handler::{fallback_function, MassaHandshake, PeerManagementHandler},
},
Expand Down Expand Up @@ -53,6 +53,7 @@ pub fn start_connectivity_thread(
let (sender_blocks_ext, receiver_blocks_ext) = unbounded();

let handle = std::thread::spawn({
let sender_endorsements_ext = sender_endorsements_ext.clone();
let sender_operations_ext = sender_operations_ext.clone();
move || {
let mut peer_management_handler = PeerManagementHandler::new(initial_peers);
Expand Down Expand Up @@ -89,6 +90,10 @@ pub fn start_connectivity_thread(
NonZeroUsize::new(usize::MAX).unwrap(),
NonZeroUsize::new(usize::MAX).unwrap(),
)));
let endorsement_cache = Arc::new(RwLock::new(EndorsementCache::new(
NonZeroUsize::new(usize::MAX).unwrap(),
NonZeroUsize::new(usize::MAX).unwrap(),
)));

// Start handlers
let mut operation_handler = OperationHandler::new(
Expand All @@ -104,9 +109,12 @@ pub fn start_connectivity_thread(
);
let mut endorsement_handler = EndorsementHandler::new(
pool_controller,
endorsement_cache,
storage,
config.clone(),
manager.active_connections.clone(),
receiver_endorsements,
sender_endorsements_ext,
receiver_endorsements_ext,
);
let mut block_handler = BlockHandler::new(
Expand Down
2 changes: 1 addition & 1 deletion massa-protocol-worker-2/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use massa_storage::Storage;

use crate::handlers::{
block_handler::commands::BlockHandlerCommand,
endorsement_handler::commands::EndorsementHandlerCommand,
endorsement_handler::commands_propagation::EndorsementHandlerCommand,
operation_handler::commands_propagation::OperationHandlerCommand,
};

Expand Down
29 changes: 29 additions & 0 deletions massa-protocol-worker-2/src/handlers/endorsement_handler/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::{num::NonZeroUsize, sync::Arc};

use lru::LruCache;
use massa_models::endorsement::EndorsementId;
use parking_lot::RwLock;
use peernet::peer_id::PeerId;

pub struct EndorsementCache {
pub checked_endorsements: LruCache<EndorsementId, ()>,
pub endorsements_known_by_peer: LruCache<PeerId, LruCache<EndorsementId, ()>>,
}

impl EndorsementCache {
pub fn new(
max_known_endorsements: NonZeroUsize,
max_known_endorsements_by_peer: NonZeroUsize,
) -> Self {
Self {
checked_endorsements: LruCache::new(max_known_endorsements),
endorsements_known_by_peer: LruCache::new(max_known_endorsements_by_peer),
}
}

pub fn insert_checked_endorsement(&mut self, endorsement_id: EndorsementId) {
self.checked_endorsements.put(endorsement_id, ());
}
}

pub type SharedEndorsementCache = Arc<RwLock<EndorsementCache>>;
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use massa_storage::Storage;

/// Commands that the endorsement handler can process
#[derive(Debug)]
pub enum EndorsementHandlerCommand {
/// Propagate endorsements
// Storage that contains endorsements to propagate
PropagateEndorsements(Storage),
}

This file was deleted.

33 changes: 19 additions & 14 deletions massa-protocol-worker-2/src/handlers/endorsement_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use std::thread::JoinHandle;

use crossbeam::channel::{unbounded, Receiver};
use crossbeam::channel::{Receiver, Sender};
use massa_pool_exports::PoolController;
use massa_protocol_exports_2::ProtocolConfig;
use massa_storage::Storage;
use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId};
use peernet::network_manager::SharedActiveConnections;

use self::{
commands::EndorsementHandlerCommand, propagation::start_propagation_thread,
retrieval::start_retrieval_thread,
cache::SharedEndorsementCache, commands_propagation::EndorsementHandlerCommand,
propagation::start_propagation_thread, retrieval::start_retrieval_thread,
};

pub mod commands;
mod internal_messages;
pub mod cache;
pub mod commands_propagation;
mod messages;
mod propagation;
mod retrieval;

pub(crate) use messages::{EndorsementMessage, EndorsementMessageSerializer};

use super::peer_handler::models::PeerMessageTuple;

pub struct EndorsementHandler {
pub endorsement_retrieval_thread: Option<JoinHandle<()>>,
pub endorsement_propagation_thread: Option<JoinHandle<()>>,
Expand All @@ -26,23 +29,25 @@ pub struct EndorsementHandler {
impl EndorsementHandler {
pub fn new(
pool_controller: Box<dyn PoolController>,
cache: SharedEndorsementCache,
storage: Storage,
config: ProtocolConfig,
active_connections: SharedActiveConnections,
receiver: Receiver<(PeerId, u64, Vec<u8>)>,
receiver_ext: Receiver<EndorsementHandlerCommand>,
receiver: Receiver<PeerMessageTuple>,
local_sender: Sender<EndorsementHandlerCommand>,
local_receiver: Receiver<EndorsementHandlerCommand>,
) -> Self {
//TODO: Define bound channel
let (internal_sender, internal_receiver) = unbounded();
let endorsement_retrieval_thread = start_retrieval_thread(
receiver,
receiver_ext,
local_sender,
cache.clone(),
pool_controller,
storage,
internal_sender,
config.clone(),
storage.clone_without_refs(),
);

let endorsement_propagation_thread =
start_propagation_thread(internal_receiver, active_connections);
start_propagation_thread(local_receiver, cache, config, active_connections);
Self {
endorsement_retrieval_thread: Some(endorsement_retrieval_thread),
endorsement_propagation_thread: Some(endorsement_propagation_thread),
Expand Down
190 changes: 134 additions & 56 deletions massa-protocol-worker-2/src/handlers/endorsement_handler/propagation.rs
Original file line number Diff line number Diff line change
@@ -1,80 +1,158 @@
use std::{collections::HashMap, thread::JoinHandle};
use std::{num::NonZeroUsize, thread::JoinHandle};

use crossbeam::channel::Receiver;
use massa_models::{endorsement::EndorsementId, prehash::PreHashSet};
use lru::LruCache;
use massa_models::{
endorsement::{EndorsementId, SecureShareEndorsement},
prehash::{PreHashMap, PreHashSet},
};
use massa_protocol_exports_2::ProtocolConfig;
use peernet::{network_manager::SharedActiveConnections, peer_id::PeerId};
use tracing::log::warn;

use crate::{
handlers::endorsement_handler::messages::EndorsementMessage, messages::MessagesSerializer,
};
use crate::messages::MessagesSerializer;

use super::{internal_messages::InternalMessage, messages::EndorsementMessageSerializer};
use super::{
cache::SharedEndorsementCache, commands_propagation::EndorsementHandlerCommand,
messages::EndorsementMessageSerializer, EndorsementMessage,
};

struct PropagationThread {
//TODO: Add pruning
cache_by_peer: HashMap<PeerId, PreHashSet<EndorsementId>>,
receiver: Receiver<EndorsementHandlerCommand>,
config: ProtocolConfig,
cache: SharedEndorsementCache,
active_connections: SharedActiveConnections,
endorsement_serializer: MessagesSerializer,
}

pub fn start_propagation_thread(
internal_receiver: Receiver<InternalMessage>,
active_connections: SharedActiveConnections,
) -> JoinHandle<()> {
//TODO: Here and everywhere add id to threads
std::thread::spawn(move || {
let endorsement_serializer = MessagesSerializer::new()
.with_endorsement_message_serializer(EndorsementMessageSerializer::new());
let mut propagation_thread = PropagationThread {
cache_by_peer: HashMap::new(),
};
impl PropagationThread {
fn run(&mut self) {
loop {
match internal_receiver.recv() {
Ok(internal_message) => {
match internal_message {
//TODO: Batch with timer 0
InternalMessage::PropagateEndorsements((from_peer_id, endorsements)) => {
// Add endorsements received as known by the sender peer
let cached_endorsements = propagation_thread
.cache_by_peer
.entry(from_peer_id)
.or_insert(PreHashSet::default());
cached_endorsements
.extend(endorsements.iter().map(|endorsement| endorsement.id));

// Send the endorsements to all connected peers
let active_connections = active_connections.read();
for (peer_id, connection) in active_connections.connections.iter() {
// Filter endorsements already known by the peer
let mut endorsements = endorsements.clone();
if let Some(cached_endorsements) =
propagation_thread.cache_by_peer.get_mut(peer_id)
match self.receiver.recv() {
Ok(msg) => {
match msg {
EndorsementHandlerCommand::PropagateEndorsements(mut endorsements) => {
// IMPORTANT: This is there to batch all "waiting to propagate endorsements" but will not work anymore if there is
// other variants in EndorsementHandlerCommand
while let Ok(msg) = self.receiver.try_recv() {
match msg {
EndorsementHandlerCommand::PropagateEndorsements(
endorsements2,
) => {
endorsements.extend(endorsements2);
}
}
}
let endorsements_ids: PreHashSet<EndorsementId> = endorsements
.get_endorsement_refs()
.iter()
.copied()
.collect();
{
let mut cache_write = self.cache.write();
for endorsement_id in endorsements_ids.iter().copied() {
cache_write.insert_checked_endorsement(endorsement_id);
}
// Add peers that potentially don't exist in cache
{
endorsements.retain(|endorsement| {
if cached_endorsements.contains(&endorsement.id) {
false
} else {
cached_endorsements.insert(endorsement.id);
true
let active_connections_read = self.active_connections.read();
for peer_id in active_connections_read.connections.keys() {
cache_write.endorsements_known_by_peer.put(peer_id.clone(), LruCache::new(NonZeroUsize::new(self.config.max_node_known_endorsements_size).expect("max_node_known_endorsements_size in config is > 0")));
}
}
let peers: Vec<PeerId> = cache_write
.endorsements_known_by_peer
.iter()
.map(|(id, _)| id.clone())
.collect();
// Clean shared cache if peers do not exist anymore
{
let active_connections_read = self.active_connections.read();
for peer_id in peers {
if !active_connections_read
.connections
.contains_key(&peer_id)
{
cache_write.endorsements_known_by_peer.pop(&peer_id);
}
});
}
}
for (peer_id, endorsement_ids) in
cache_write.endorsements_known_by_peer.iter_mut()
{
let new_endorsements: PreHashMap<
EndorsementId,
SecureShareEndorsement,
> = {
let endorsements_reader = endorsements.read_endorsements();
endorsements
.get_endorsement_refs()
.iter()
.filter_map(|id| {
if endorsement_ids.contains(id) {
return None;
}
Some((
*id,
endorsements_reader.get(id).cloned().unwrap(),
))
})
.collect()
};
for endorsement_id in new_endorsements.keys().copied() {
endorsement_ids.put(endorsement_id, ());
}
let to_send =
new_endorsements.into_values().collect::<Vec<_>>();
if !to_send.is_empty() {
let active_connections_read =
self.active_connections.read();
if let Some(peer) =
active_connections_read.connections.get(peer_id)
{
if let Err(err) = peer.send_channels.send(
&self.endorsement_serializer,
EndorsementMessage::Endorsements(to_send).into(),
false,
) {
warn!("could not send endorsements batch to node {}: {}", peer_id, err);
}
}
}
}

// Send the endorsements
let message = EndorsementMessage::Endorsements(endorsements);
println!("Sending message to {:?}", peer_id);
// TODO: Error management
connection
.send_channels
.send(&endorsement_serializer, message.into(), false)
.unwrap();
}
}
}
}
Err(err) => {
println!("Error: {:?}", err);
warn!(
"Error in propagation thread of endorsement handler: {:#?}",
err
);
return;
}
}
}
}
}

pub fn start_propagation_thread(
receiver: Receiver<EndorsementHandlerCommand>,
cache: SharedEndorsementCache,
config: ProtocolConfig,
active_connections: SharedActiveConnections,
) -> JoinHandle<()> {
//TODO: Here and everywhere add id to threads
std::thread::spawn(move || {
let endorsement_serializer = MessagesSerializer::new()
.with_endorsement_message_serializer(EndorsementMessageSerializer::new());
let mut propagation_thread = PropagationThread {
receiver,
config,
active_connections,
cache,
endorsement_serializer,
};
propagation_thread.run();
})
}
Loading