Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RPC and network events #530

Merged
merged 7 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 92 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ blocks = { package = "forest_blocks", path = "../blocks" }
beacon = { path = "../beacon" }
db = { path = "../../node/db" }
encoding = { package = "forest_encoding", path = "../../encoding" }
libp2p = "0.20"
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }
cid = { package = "forest_cid", path = "../../ipld/cid" }
ipld_blockstore = { path = "../../ipld/blockstore" }
chain = { path = "../chain" }
Expand Down
34 changes: 19 additions & 15 deletions blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use async_std::sync::{Receiver, Sender};
use blocks::{FullTipset, Tipset, TipsetKeys};
use forest_libp2p::{
blocksync::{BlockSyncRequest, BlockSyncResponse, BLOCKS, MESSAGES},
hello::HelloMessage,
rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId},
hello::HelloRequest,
rpc::{RPCRequest, RPCResponse, RequestId},
NetworkEvent, NetworkMessage,
};
use libp2p::core::PeerId;
Expand Down Expand Up @@ -44,7 +44,7 @@ impl SyncNetworkContext {
network_send,
rpc_receiver,
receiver,
request_id: 1,
request_id: RequestId(1),
}
}

Expand Down Expand Up @@ -111,22 +111,33 @@ impl SyncNetworkContext {
}

/// Send a hello request to the network (does not await response)
pub async fn hello_request(&self, peer_id: PeerId, request: HelloMessage) {
pub async fn hello_request(&mut self, peer_id: PeerId, request: HelloRequest) {
trace!("Sending Hello Message {:?}", request);
// TODO update to await response when we want to handle the latency
self.send_rpc_event(peer_id, RPCEvent::Request(0, RPCRequest::Hello(request)))
self.network_send
.send(NetworkMessage::RPC {
peer_id,
request: RPCRequest::Hello(request),
id: self.request_id,
})
.await;
self.request_id.0 += 1;
}

/// Send any RPC request to the network and await the response
pub async fn send_rpc_request(
&mut self,
peer_id: PeerId,
rpc_request: RPCRequest,
request: RPCRequest,
) -> Result<RPCResponse, &'static str> {
let request_id = self.request_id;
self.request_id += 1;
self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request))
self.request_id.0 += 1;
self.network_send
.send(NetworkMessage::RPC {
peer_id,
request,
id: request_id,
})
.await;
loop {
match future::timeout(Duration::from_secs(RPC_TIMEOUT), self.rpc_receiver.next()).await
Expand All @@ -142,11 +153,4 @@ impl SyncNetworkContext {
}
}
}

/// Handles sending the base event to the network service
async fn send_rpc_event(&self, peer_id: PeerId, event: RPCEvent) {
self.network_send
.send(NetworkMessage::RPC { peer_id, event })
.await
}
}
13 changes: 9 additions & 4 deletions blockchain/chain_sync/src/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ impl NetworkHandler {
loop {
match receiver.next().await {
// Handle specifically RPC responses and send to that channel
Some(NetworkEvent::RPCResponse { req_id, response }) => {
rpc_send.send((req_id, response)).await
Some(NetworkEvent::BlockSyncResponse {
request_id,
response,
}) => {
rpc_send
.send((request_id, RPCResponse::BlockSync(response)))
.await
}
// Pass any non RPC responses through event channel
Some(event) => {
// Update peer on this thread before sending hello
if let NetworkEvent::Hello { source, .. } = &event {
if let NetworkEvent::HelloRequest { channel, .. } = &event {
// TODO should probably add peer with their tipset/ not handled seperately
peer_manager.add_peer(source.clone(), None).await;
peer_manager.add_peer(channel.peer.clone(), None).await;
}

// TODO revisit, doing this to avoid blocking this thread but can handle better
Expand Down
21 changes: 11 additions & 10 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use encoding::{Cbor, Error as EncodingError};
use fil_types::SectorInfo;
use filecoin_proofs_api::{post::verify_winning_post, ProverId, PublicReplicaInfo, SectorId};
use forest_libp2p::{
hello::HelloMessage, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES,
hello::HelloRequest, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES,
};
use futures::{
executor::block_on,
Expand Down Expand Up @@ -150,21 +150,22 @@ where

while let Some(event) = self.network.receiver.next().await {
match event {
NetworkEvent::Hello { source, message } => {
NetworkEvent::HelloRequest { request, channel } => {
let source = channel.peer.clone();
info!(
"Message inbound, heaviest tipset cid: {:?}",
message.heaviest_tip_set
request.heaviest_tip_set
);
match self
.fetch_tipset(
source.clone(),
&TipsetKeys::new(message.heaviest_tip_set.clone()),
&TipsetKeys::new(request.heaviest_tip_set.clone()),
)
.await
{
Ok(fts) => {
if self.inform_new_head(source.clone(), &fts).await.is_err() {
warn!("Failed to sync with provided tipset",);
if let Err(e) = self.inform_new_head(source.clone(), &fts).await {
warn!("Failed to sync with provided tipset: {}", e);
};
}
Err(e) => {
Expand All @@ -177,7 +178,7 @@ where
self.network
.hello_request(
peer_id,
HelloMessage {
HelloRequest {
heaviest_tip_set: heaviest.cids().to_vec(),
heaviest_tipset_height: heaviest.epoch(),
heaviest_tipset_weight: heaviest.weight().clone(),
Expand Down Expand Up @@ -1023,7 +1024,7 @@ mod tests {
use beacon::MockBeacon;
use blocks::BlockHeader;
use db::MemoryDB;
use forest_libp2p::NetworkEvent;
use forest_libp2p::{rpc::RequestId, NetworkEvent};
use std::sync::Arc;
use test_utils::{construct_blocksync_response, construct_messages, construct_tipset};

Expand Down Expand Up @@ -1059,9 +1060,9 @@ mod tests {

task::block_on(async {
event_sender
.send(NetworkEvent::RPCResponse {
.send(NetworkEvent::BlockSyncResponse {
// TODO update this, only matching first index of requestId
req_id: 1,
request_id: RequestId(1),
response: rpc_response,
})
.await;
Expand Down
13 changes: 9 additions & 4 deletions blockchain/chain_sync/src/sync/peer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use async_std::task;
use beacon::MockBeacon;
use blocks::BlockHeader;
use db::MemoryDB;
use forest_libp2p::hello::HelloMessage;
use forest_libp2p::{hello::HelloRequest, rpc::ResponseChannel};
use futures::channel::oneshot;
use libp2p::core::PeerId;
use std::time::Duration;

Expand Down Expand Up @@ -48,12 +49,16 @@ fn peer_manager_update() {

let source = PeerId::random();
let source_clone = source.clone();
let (sender, _) = oneshot::channel();

task::block_on(async {
event_sender
.send(NetworkEvent::Hello {
message: HelloMessage::default(),
source,
.send(NetworkEvent::HelloRequest {
request: HelloRequest::default(),
channel: ResponseChannel {
peer: source,
sender,
},
})
.await;

Expand Down
2 changes: 1 addition & 1 deletion forest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2018"
forest_libp2p = { path = "../node/forest_libp2p" }
utils = { path = "../node/utils" }
db = { path = "../node/db", features = ["rocksdb"] }
libp2p = "0.20"
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }
futures = "0.3.5"
log = "0.4.8"
async-log = "2.0.0"
Expand Down
2 changes: 1 addition & 1 deletion ipld/graphsync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cid = { package = "forest_cid", path = "../cid", version = "0.1" }
forest_ipld = { path = "../" }
fnv = "1.0.6"
forest_encoding = { path = "../../encoding", version = "0.1" }
libp2p = "0.20"
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }
futures = "0.3.5"
futures-util = "0.3.5"
futures_codec = "0.4.0"
Expand Down
12 changes: 6 additions & 6 deletions ipld/graphsync/src/message/proto/message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.15.0. Do not edit
// This file is generated by rust-protobuf 2.15.1. Do not edit
// @generated

// https://github.com/rust-lang/rust-clippy/issues/702
Expand All @@ -25,7 +25,7 @@ use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;

/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_15_0;
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_15_1;

#[derive(PartialEq,Clone,Default)]
pub struct Message {
Expand Down Expand Up @@ -254,7 +254,7 @@ impl ::protobuf::Message for Message {
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}

Expand Down Expand Up @@ -618,7 +618,7 @@ impl ::protobuf::Message for Message_Request {
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}

Expand Down Expand Up @@ -870,7 +870,7 @@ impl ::protobuf::Message for Message_Response {
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}

Expand Down Expand Up @@ -1081,7 +1081,7 @@ impl ::protobuf::Message for Message_Block {
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}

Expand Down
3 changes: 2 additions & 1 deletion node/forest_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2018"

[dependencies]
utils = { path = "../utils" }
libp2p = "0.20"
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }
futures = "0.3.5"
futures-util = "0.3.5"
futures_codec = "0.4.0"
Expand All @@ -22,6 +22,7 @@ fnv = "1.0.6"
smallvec = "1.1.0"
clock = { path = "../clock" }
num-bigint = { path = "../../utils/bigint", package = "forest_bigint" }
async-trait = "0.1"

[dev-dependencies]
forest_address = { path = "../../vm/address" }
Expand Down
Loading