Skip to content

Commit

Permalink
Refactor RPC and network events (#530)
Browse files Browse the repository at this point in the history
* initial refactor with local version

* cleanup and switch to using forked repo

* cargo update for protobuf gen

* Remove other dead code

* Remove RPCEvent
  • Loading branch information
austinabell authored Jul 2, 2020
1 parent 2d88d06 commit 0facf1b
Show file tree
Hide file tree
Showing 26 changed files with 500 additions and 1,209 deletions.
178 changes: 92 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ blocks = { package = "forest_blocks", path = "../blocks" }
beacon = { path = "../beacon" }
db = { path = "../../node/db" }
encoding = { package = "forest_encoding", path = "../../encoding" }
libp2p = "0.20"
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }
cid = { package = "forest_cid", path = "../../ipld/cid" }
ipld_blockstore = { path = "../../ipld/blockstore" }
chain = { path = "../chain" }
Expand Down
34 changes: 19 additions & 15 deletions blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use async_std::sync::{Receiver, Sender};
use blocks::{FullTipset, Tipset, TipsetKeys};
use forest_libp2p::{
blocksync::{BlockSyncRequest, BlockSyncResponse, BLOCKS, MESSAGES},
hello::HelloMessage,
rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId},
hello::HelloRequest,
rpc::{RPCRequest, RPCResponse, RequestId},
NetworkEvent, NetworkMessage,
};
use libp2p::core::PeerId;
Expand Down Expand Up @@ -44,7 +44,7 @@ impl SyncNetworkContext {
network_send,
rpc_receiver,
receiver,
request_id: 1,
request_id: RequestId(1),
}
}

Expand Down Expand Up @@ -111,22 +111,33 @@ impl SyncNetworkContext {
}

/// Send a hello request to the network (does not await response)
pub async fn hello_request(&self, peer_id: PeerId, request: HelloMessage) {
pub async fn hello_request(&mut self, peer_id: PeerId, request: HelloRequest) {
trace!("Sending Hello Message {:?}", request);
// TODO update to await response when we want to handle the latency
self.send_rpc_event(peer_id, RPCEvent::Request(0, RPCRequest::Hello(request)))
self.network_send
.send(NetworkMessage::RPC {
peer_id,
request: RPCRequest::Hello(request),
id: self.request_id,
})
.await;
self.request_id.0 += 1;
}

/// Send any RPC request to the network and await the response
pub async fn send_rpc_request(
&mut self,
peer_id: PeerId,
rpc_request: RPCRequest,
request: RPCRequest,
) -> Result<RPCResponse, &'static str> {
let request_id = self.request_id;
self.request_id += 1;
self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request))
self.request_id.0 += 1;
self.network_send
.send(NetworkMessage::RPC {
peer_id,
request,
id: request_id,
})
.await;
loop {
match future::timeout(Duration::from_secs(RPC_TIMEOUT), self.rpc_receiver.next()).await
Expand All @@ -142,11 +153,4 @@ impl SyncNetworkContext {
}
}
}

/// Handles sending the base event to the network service
async fn send_rpc_event(&self, peer_id: PeerId, event: RPCEvent) {
self.network_send
.send(NetworkMessage::RPC { peer_id, event })
.await
}
}
13 changes: 9 additions & 4 deletions blockchain/chain_sync/src/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ impl NetworkHandler {
loop {
match receiver.next().await {
// Handle specifically RPC responses and send to that channel
Some(NetworkEvent::RPCResponse { req_id, response }) => {
rpc_send.send((req_id, response)).await
Some(NetworkEvent::BlockSyncResponse {
request_id,
response,
}) => {
rpc_send
.send((request_id, RPCResponse::BlockSync(response)))
.await
}
// Pass any non RPC responses through event channel
Some(event) => {
// Update peer on this thread before sending hello
if let NetworkEvent::Hello { source, .. } = &event {
if let NetworkEvent::HelloRequest { channel, .. } = &event {
// TODO should probably add peer with their tipset/ not handled seperately
peer_manager.add_peer(source.clone(), None).await;
peer_manager.add_peer(channel.peer.clone(), None).await;
}

// TODO revisit, doing this to avoid blocking this thread but can handle better
Expand Down
21 changes: 11 additions & 10 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use encoding::{Cbor, Error as EncodingError};
use fil_types::SectorInfo;
use filecoin_proofs_api::{post::verify_winning_post, ProverId, PublicReplicaInfo, SectorId};
use forest_libp2p::{
hello::HelloMessage, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES,
hello::HelloRequest, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES,
};
use futures::{
executor::block_on,
Expand Down Expand Up @@ -150,21 +150,22 @@ where

while let Some(event) = self.network.receiver.next().await {
match event {
NetworkEvent::Hello { source, message } => {
NetworkEvent::HelloRequest { request, channel } => {
let source = channel.peer.clone();
info!(
"Message inbound, heaviest tipset cid: {:?}",
message.heaviest_tip_set
request.heaviest_tip_set
);
match self
.fetch_tipset(
source.clone(),
&TipsetKeys::new(message.heaviest_tip_set.clone()),
&TipsetKeys::new(request.heaviest_tip_set.clone()),
)
.await
{
Ok(fts) => {
if self.inform_new_head(source.clone(), &fts).await.is_err() {
warn!("Failed to sync with provided tipset",);
if let Err(e) = self.inform_new_head(source.clone(), &fts).await {
warn!("Failed to sync with provided tipset: {}", e);
};
}
Err(e) => {
Expand All @@ -177,7 +178,7 @@ where
self.network
.hello_request(
peer_id,
HelloMessage {
HelloRequest {
heaviest_tip_set: heaviest.cids().to_vec(),
heaviest_tipset_height: heaviest.epoch(),
heaviest_tipset_weight: heaviest.weight().clone(),
Expand Down Expand Up @@ -1023,7 +1024,7 @@ mod tests {
use beacon::MockBeacon;
use blocks::BlockHeader;
use db::MemoryDB;
use forest_libp2p::NetworkEvent;
use forest_libp2p::{rpc::RequestId, NetworkEvent};
use std::sync::Arc;
use test_utils::{construct_blocksync_response, construct_messages, construct_tipset};

Expand Down Expand Up @@ -1059,9 +1060,9 @@ mod tests {

task::block_on(async {
event_sender
.send(NetworkEvent::RPCResponse {
.send(NetworkEvent::BlockSyncResponse {
// TODO update this, only matching first index of requestId
req_id: 1,
request_id: RequestId(1),
response: rpc_response,
})
.await;
Expand Down
13 changes: 9 additions & 4 deletions blockchain/chain_sync/src/sync/peer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use async_std::task;
use beacon::MockBeacon;
use blocks::BlockHeader;
use db::MemoryDB;
use forest_libp2p::hello::HelloMessage;
use forest_libp2p::{hello::HelloRequest, rpc::ResponseChannel};
use futures::channel::oneshot;
use libp2p::core::PeerId;
use std::time::Duration;

Expand Down Expand Up @@ -48,12 +49,16 @@ fn peer_manager_update() {

let source = PeerId::random();
let source_clone = source.clone();
let (sender, _) = oneshot::channel();

task::block_on(async {
event_sender
.send(NetworkEvent::Hello {
message: HelloMessage::default(),
source,
.send(NetworkEvent::HelloRequest {
request: HelloRequest::default(),
channel: ResponseChannel {
peer: source,
sender,
},
})
.await;

Expand Down
2 changes: 1 addition & 1 deletion forest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"
forest_libp2p = { path = "../node/forest_libp2p" }
utils = { path = "../node/utils" }
db = { path = "../node/db", features = ["rocksdb"] }
libp2p = "0.20"
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }
futures = "0.3.5"
log = "0.4.8"
async-log = "2.0.0"
Expand Down
2 changes: 1 addition & 1 deletion ipld/graphsync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cid = { package = "forest_cid", path = "../cid", version = "0.1" }
forest_ipld = { path = "../" }
fnv = "1.0.6"
forest_encoding = { path = "../../encoding", version = "0.1" }
libp2p = "0.20"
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }
futures = "0.3.5"
futures-util = "0.3.5"
futures_codec = "0.4.0"
Expand Down
12 changes: 6 additions & 6 deletions ipld/graphsync/src/message/proto/message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.15.0. Do not edit
// This file is generated by rust-protobuf 2.15.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -25,7 +25,7 @@ use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_15_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_15_1;

#[derive(PartialEq,Clone,Default)]
pub struct Message {
Expand Down Expand Up @@ -254,7 +254,7 @@ impl ::protobuf::Message for Message {
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}

Expand Down Expand Up @@ -618,7 +618,7 @@ impl ::protobuf::Message for Message_Request {
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}

Expand Down Expand Up @@ -870,7 +870,7 @@ impl ::protobuf::Message for Message_Response {
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}

Expand Down Expand Up @@ -1081,7 +1081,7 @@ impl ::protobuf::Message for Message_Block {
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}

Expand Down
3 changes: 2 additions & 1 deletion node/forest_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2018"

[dependencies]
utils = { path = "../utils" }
libp2p = "0.20"
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }
futures = "0.3.5"
futures-util = "0.3.5"
futures_codec = "0.4.0"
Expand All @@ -22,6 +22,7 @@ fnv = "1.0.6"
smallvec = "1.1.0"
clock = { path = "../clock" }
num-bigint = { path = "../../utils/bigint", package = "forest_bigint" }
async-trait = "0.1"

[dev-dependencies]
forest_address = { path = "../../vm/address" }
Expand Down
Loading

0 comments on commit 0facf1b

Please sign in to comment.