-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Implement bitswap in the network behaviour using libp2p_bitswap. #6795
Changes from 29 commits
2b67bbb
d5d3130
5bd7cb7
6fa276d
926575f
471b171
83bc9c6
be7baf5
6147c6a
f90a39c
2496778
3244af9
8d028d4
ae5fd43
ccb5910
3da54fa
6f3d206
e7ffe2a
8e4c008
435fa52
d61bddc
b8804df
f80e0ad
4b954d2
83de229
a8a0f8b
1f9c171
9923b11
30fd353
38f4667
8187024
5de7605
da1c738
b8f0981
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> { | |
peer_info: peer_info::PeerInfoBehaviour, | ||
/// Discovers nodes of the network. | ||
discovery: DiscoveryBehaviour, | ||
/// Exchanges blocks of data with other nodes. | ||
pub(crate) bitswap: libp2p_bitswap::Bitswap, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer if you added methods to |
||
/// Generic request-reponse protocols. | ||
request_responses: request_responses::RequestResponsesBehaviour, | ||
/// Block request handling. | ||
|
@@ -172,6 +174,18 @@ pub enum BehaviourOut<B: BlockT> { | |
/// Events generated by a DHT as a response to get_value or put_value requests as well as the | ||
/// request duration. | ||
Dht(DhtEvent, Duration), | ||
|
||
/// Event generated by bitswap. | ||
Bitswap(BitswapEvent) | ||
} | ||
|
||
/// An event generated by bitswap. | ||
#[derive(Clone, Debug)] | ||
pub enum BitswapEvent { | ||
/// A block was received. | ||
ReceivedBlock(PeerId, tiny_cid::Cid, Box<[u8]>), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be called multiple times with the same block? What's the relationship between the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
no, unless the events weren't polled until the block arrived and a new want request was submitted.
the cid hash matches the data, the cid is the one submitted in the want request
it's guaranteed to be in response to a want request. |
||
/// A WANT request was received. | ||
ReceivedWant(PeerId, tiny_cid::Cid, i32), | ||
} | ||
|
||
impl<B: BlockT, H: ExHashT> Behaviour<B, H> { | ||
|
@@ -197,6 +211,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { | |
finality_proof_requests, | ||
light_client_handler, | ||
events: VecDeque::new(), | ||
bitswap: libp2p_bitswap::Bitswap::new(), | ||
role, | ||
}) | ||
} | ||
|
@@ -537,6 +552,25 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut> | |
} | ||
} | ||
|
||
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<libp2p_bitswap::BitswapEvent> | ||
for Behaviour<B, H> { | ||
fn inject_event(&mut self, event: libp2p_bitswap::BitswapEvent) { | ||
match event { | ||
libp2p_bitswap::BitswapEvent::ReceivedBlock(peer_id, cid, data) => { | ||
self.events.push_back( | ||
BehaviourOut::Bitswap(BitswapEvent::ReceivedBlock(peer_id, cid, data)) | ||
); | ||
}, | ||
libp2p_bitswap::BitswapEvent::ReceivedWant(peer_id, cid, priority) => { | ||
self.events.push_back( | ||
BehaviourOut::Bitswap(BitswapEvent::ReceivedWant(peer_id, cid, priority)) | ||
); | ||
} | ||
libp2p_bitswap::BitswapEvent::ReceivedCancel(..) => {}, | ||
} | ||
} | ||
} | ||
|
||
impl<B: BlockT, H: ExHashT> Behaviour<B, H> { | ||
fn poll<TEv>(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> { | ||
if let Some(event) = self.events.pop_front() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,7 +81,7 @@ impl<M> QueuedSender<M> { | |
protocol: ConsensusEngineId, | ||
queue_size_limit: usize, | ||
messages_encode: F | ||
) -> (Self, impl Future<Output = ()> + Send + 'static) | ||
) -> (Self, impl Future<Output = ()> + 'static) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this change needed for this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 1928 of https://gitlab.parity.io/parity/substrate/-/jobs/658201#L1928 is why we need this. |
||
where | ||
M: Send + 'static, | ||
B: BlockT + 'static, | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -21,6 +21,7 @@ use bytes::Bytes; | |||||||
use libp2p::core::PeerId; | ||||||||
use libp2p::kad::record::Key; | ||||||||
use sp_runtime::ConsensusEngineId; | ||||||||
use crate::behaviour::BitswapEvent; | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
/// Events generated by DHT as a response to get_value and put_value requests. | ||||||||
#[derive(Debug, Clone)] | ||||||||
|
@@ -45,6 +46,8 @@ pub enum DhtEvent { | |||||||
pub enum Event { | ||||||||
/// Event generated by a DHT. | ||||||||
Dht(DhtEvent), | ||||||||
/// Event generated by Bitswap. | ||||||||
Bitswap(BitswapEvent), | ||||||||
|
||||||||
/// Opened a substream with the given node with the given notifications protocol. | ||||||||
/// | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -597,6 +597,21 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> { | |
pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> { | ||
self.service.add_reserved_peer(peer) | ||
} | ||
|
||
/// Get the number of bitswap peers we are connected to. | ||
pub fn bitswap_num_peers(&self) -> usize { | ||
self.network_service.bitswap.peers().count() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's a "bitswap peer" and what's a "non-bitswap peer"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So these are open connections to other peers. It will send want requests to any peer that connects that speaks bitswap. For each open connection it maintains a list of blocks the other peer wants. If in the future the block is available it sends it to the peer. This architecture is because when I first wrote bitswap it seemed like you could have a strategy that gets a block from a friendly peer to get a block from a new peer. Now I think it should probably just ignore the want requests of blocks it doesn't have. |
||
|
||
/// Get the number of bitswap peers who want a block. | ||
pub fn bitswap_num_peers_want(&self, cid: &tiny_cid::Cid) -> usize { | ||
self.network_service.bitswap.peers_want(cid).count() | ||
} | ||
|
||
/// Get if a specific bitswap peer wants a block. | ||
pub fn bitswap_peer_wants_cid(&self, peer_id: &PeerId, cid: &tiny_cid::Cid) -> bool { | ||
self.network_service.bitswap.peers_want(cid).find(|id| **id == *peer_id).is_some() | ||
} | ||
} | ||
|
||
impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { | ||
|
@@ -1027,6 +1042,34 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> { | |
.to_worker | ||
.unbounded_send(ServiceToWorkerMsg::OwnBlockImported(hash, number)); | ||
} | ||
|
||
/// Send a bitswap block to a peer. | ||
pub fn bitswap_send_block(&self, peer_id: PeerId, cid: tiny_cid::Cid, data: Box<[u8]>) { | ||
let _ = self | ||
.to_worker | ||
.unbounded_send(ServiceToWorkerMsg::BitswapSendBlock(peer_id, cid, data)); | ||
} | ||
|
||
/// Send a bitswap block to all peers that have the block in their wantlist. | ||
pub fn bitswap_send_block_all(&self, cid: tiny_cid::Cid, data: Box<[u8]>) { | ||
let _ = self | ||
.to_worker | ||
.unbounded_send(ServiceToWorkerMsg::BitswapSendBlockAll(cid, data)); | ||
} | ||
|
||
/// Send a bitswap WANT request to all peers for a block. | ||
pub fn bitswap_want_block(&self, cid: tiny_cid::Cid, priority: i32) { | ||
let _ = self | ||
.to_worker | ||
.unbounded_send(ServiceToWorkerMsg::BitswapWantBlock(cid, priority)); | ||
} | ||
|
||
/// Cancel a bitswap WANT request. | ||
pub fn bitswap_cancel_block(&self, cid: tiny_cid::Cid) { | ||
let _ = self | ||
.to_worker | ||
.unbounded_send(ServiceToWorkerMsg::BitswapCancelBlock(cid)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be documented. What's the general workflow of the thing? What happens if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bitswap_send_block is usually in response to a want event. the block is added to a queue and if a cancel request is received before the message is sent it will be removed from the queue. reading the code, if the cancel request comes in before the want request was processed, the cancel request will be ignored. I guess it could use some work, but it's not "terrible" |
||
} | ||
|
||
impl<B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle | ||
|
@@ -1159,6 +1202,10 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> { | |
DisconnectPeer(PeerId), | ||
UpdateChain, | ||
OwnBlockImported(B::Hash, NumberFor<B>), | ||
BitswapSendBlock(PeerId, tiny_cid::Cid, Box<[u8]>), | ||
BitswapSendBlockAll(tiny_cid::Cid, Box<[u8]>), | ||
BitswapWantBlock(tiny_cid::Cid, i32), | ||
BitswapCancelBlock(tiny_cid::Cid), | ||
} | ||
|
||
/// Main network worker. Must be polled in order for the network to advance. | ||
|
@@ -1301,6 +1348,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { | |
this.network_service.user_protocol_mut().update_chain(), | ||
ServiceToWorkerMsg::OwnBlockImported(hash, number) => | ||
this.network_service.user_protocol_mut().own_block_imported(hash, number), | ||
ServiceToWorkerMsg::BitswapSendBlock(peer_id, cid, block) => | ||
this.network_service.bitswap.send_block(&peer_id, cid, block), | ||
ServiceToWorkerMsg::BitswapSendBlockAll(cid, block) => | ||
this.network_service.bitswap.send_block_all(&cid, &block), | ||
ServiceToWorkerMsg::BitswapWantBlock(cid, priority) => | ||
this.network_service.bitswap.want_block(cid, priority), | ||
ServiceToWorkerMsg::BitswapCancelBlock(cid) => | ||
this.network_service.bitswap.cancel_block(&cid), | ||
} | ||
} | ||
|
||
|
@@ -1506,6 +1561,9 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { | |
|
||
this.event_streams.send(Event::Dht(event)); | ||
}, | ||
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Bitswap(ev))) => { | ||
this.event_streams.send(Event::Bitswap(ev)); | ||
}, | ||
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established }) => { | ||
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); | ||
|
||
|
@@ -1531,14 +1589,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { | |
let reason = match cause { | ||
Some(ConnectionError::IO(_)) => "transport-error", | ||
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( | ||
EitherError::A(EitherError::A(EitherError::A(EitherError::B( | ||
EitherError::A(PingFailure::Timeout)))))))))) => "ping-timeout", | ||
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::B( | ||
EitherError::A(PingFailure::Timeout))))))))))) => "ping-timeout", | ||
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( | ||
EitherError::A(EitherError::A(EitherError::A(EitherError::A( | ||
NotifsHandlerError::Legacy(LegacyConnectionKillError)))))))))) => "force-closed", | ||
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::A( | ||
NotifsHandlerError::Legacy(LegacyConnectionKillError))))))))))) => "force-closed", | ||
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( | ||
EitherError::A(EitherError::A(EitherError::A(EitherError::A( | ||
NotifsHandlerError::SyncNotificationsClogged))))))))) => "sync-notifications-clogged", | ||
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::A( | ||
NotifsHandlerError::SyncNotificationsClogged)))))))))) => "sync-notifications-clogged", | ||
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error", | ||
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout", | ||
None => "actively-closed", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ use std::iter::Iterator; | |
/// In-memory storage for offchain workers. | ||
#[derive(Debug, Clone, Default)] | ||
pub struct InMemOffchainStorage { | ||
storage: HashMap<Vec<u8>, Vec<u8>>, | ||
pub(crate) storage: HashMap<Vec<u8>, Vec<u8>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't my code, but similarly I don't think it's great software design. |
||
} | ||
|
||
impl InMemOffchainStorage { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,11 @@ impl TestPersistentOffchainDB { | |
} | ||
} | ||
} | ||
|
||
/// Get whether the DB is empty. | ||
pub fn is_empty(&self) -> bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't really see how this is relevant for the rest of the PR. |
||
self.persistent.read().storage.is_empty() | ||
} | ||
} | ||
|
||
impl OffchainStorage for TestPersistentOffchainDB { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alphabetical ordering, please!