From 10974f8918ddd7ded713c187c6409bb8eb6e1418 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 13:17:31 +0200 Subject: [PATCH 1/6] feat: enable content discovery features Signed-off-by: ljedrz --- src/lib.rs | 7 +++-- src/p2p/behaviour.rs | 70 ++++++++++++++++++++++++++------------------ tests/kademlia.rs | 31 ++++++++++++++++++-- 3 files changed, 74 insertions(+), 34 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 455ecf58e..ecbdd5f14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -777,8 +777,11 @@ impl Future for IpfsFuture { // exhaust the swarm before possibly causing the swarm to do more work by popping // off the events from Ipfs and ... this looping goes on for a while. done = false; - if let SwarmEvent::NewListenAddr(addr) = inner { - self.complete_listening_address_adding(addr); + match inner { + SwarmEvent::NewListenAddr(addr) => { + self.complete_listening_address_adding(addr); + } + _ => println!("{:?}", inner), } } diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 8224f59ae..4304d78e9 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -95,41 +95,40 @@ impl NetworkBehaviourEventProcess for Behaviour info!("kad: peer {} is close", peer); } } - GetProviders(Ok(GetProvidersOk { key, providers, .. })) => { - let key = multibase::encode(Base::Base58Btc, key); - if providers.is_empty() { - // FIXME: not sure if this is possible + GetProviders(Ok(GetProvidersOk { key, providers, closest_peers })) => { + let key = multibase::encode(Base::Base32Lower, key); + if providers.is_empty() && closest_peers.is_empty() { warn!("kad: could not find a provider for {}", key); } else { - for peer in providers { - info!("kad: {} provided by {}", key, peer); + for peer in closest_peers.into_iter().chain(providers.into_iter()) { + info!("kad: {} is provided by {}", key, peer); self.bitswap.connect(peer); } } } GetProviders(Err(GetProvidersError::Timeout { key, .. })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out trying to get providers for {}", key); } StartProviding(Ok(AddProviderOk { key })) => { - let key = multibase::encode(Base::Base58Btc, key); - info!("kad: added provider {}", key); + let key = multibase::encode(Base::Base32Lower, key); + info!("kad: providing {}", key); } StartProviding(Err(AddProviderError::Timeout { key })) => { - let key = multibase::encode(Base::Base58Btc, key); - warn!("kad: timed out trying to add provider {}", key); + let key = multibase::encode(Base::Base32Lower, key); + warn!("kad: timed out trying to provide {}", key); } RepublishProvider(Ok(AddProviderOk { key })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); info!("kad: republished provider {}", key); } RepublishProvider(Err(AddProviderError::Timeout { key })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out trying to republish provider {}", key); } GetRecord(Ok(GetRecordOk { records })) => { for record in records { - let key = multibase::encode(Base::Base58Btc, record.record.key); + let key = multibase::encode(Base::Base32Lower, record.record.key); info!("kad: got record {}:{:?}", key, record.record.value); } } @@ -137,7 +136,7 @@ impl NetworkBehaviourEventProcess for Behaviour key, closest_peers: _, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!("kad: couldn't find record {}", key); } GetRecord(Err(GetRecordError::QuorumFailed { @@ -145,14 +144,14 @@ impl NetworkBehaviourEventProcess for Behaviour records, quorum, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!( "kad: quorum failed {} trying to get key {}; got the following:", quorum, key ); for record in records { - let key = multibase::encode(Base::Base58Btc, record.record.key); + let key = multibase::encode(Base::Base32Lower, record.record.key); info!("kad: got record {}:{:?}", key, record.record.value); } } @@ -161,20 +160,20 @@ impl NetworkBehaviourEventProcess for Behaviour records, quorum: _, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!( "kad: timed out trying to get key {}; got the following:", key ); for record in records { - let key = multibase::encode(Base::Base58Btc, record.record.key); + let key = multibase::encode(Base::Base32Lower, record.record.key); info!("kad: got record {}:{:?}", key, record.record.value); } } PutRecord(Ok(PutRecordOk { key })) | RepublishRecord(Ok(PutRecordOk { key })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); info!("kad: successfully put record {}", key); } PutRecord(Err(PutRecordError::QuorumFailed { @@ -187,7 +186,7 @@ impl NetworkBehaviourEventProcess for Behaviour success: _, quorum, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); info!( "kad: quorum failed ({}) trying to put record {}", quorum, key @@ -203,7 +202,7 @@ impl NetworkBehaviourEventProcess for Behaviour success: _, quorum: _, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); info!("kad: timed out trying to put record {}", key); } } @@ -216,7 +215,7 @@ impl NetworkBehaviourEventProcess for Behaviour trace!("kad: routing updated; {}: {:?}", peer, addresses); } UnroutablePeer { peer } => { - warn!("kad: peer {} is unroutable", peer); + trace!("kad: peer {} is unroutable", peer); } RoutablePeer { peer, address } => { trace!("kad: peer {} ({}) is routable", peer, address); @@ -258,7 +257,7 @@ impl NetworkBehaviourEventProcess for Behaviour< BitswapEvent::ReceivedWant(peer_id, cid, priority) => { info!( "Peer {} wants block {} with priority {}", - peer_id.to_base58(), + peer_id, cid, priority ); @@ -419,16 +418,29 @@ impl Behaviour { self.swarm.disconnect(addr) } + // FIXME: it would be best if get_providers is called only in case the already connected + // peers don't have it pub fn want_block(&mut self, cid: Cid) { - //let hash = Multihash::from_bytes(cid.to_bytes()).unwrap(); - //self.kademlia.get_providers(hash); + let key = cid.to_bytes(); + self.kademlia.get_providers(key.into()); self.bitswap.want_block(cid, 1); } + // FIXME: it would probably be best if this could return a SubscriptionFuture, so + // that the put_block operation truly finishes only when the block is already being + // provided; it is, however, pretty tricky in terms of internal communication between + // Ipfs and IpfsFuture objects - it would currently require some extra back-and-forth pub fn provide_block(&mut self, cid: Cid) { - info!("Providing block {}", cid.to_string()); - //let hash = Multihash::from_bytes(cid.to_bytes()).unwrap(); - //self.kademlia.add_providing(PeerId::from_multihash(hash).unwrap()); + let key = cid.to_bytes(); + match self.kademlia.start_providing(key.clone().into()) { + Ok(_id) => { + // Ok(self.kad_subscriptions.create_subscription(id.into(), None)) + }, + Err(e) => { + error!("kad: can't provide block {}: {:?}", cid, e); + // Err(anyhow!("kad: can't provide block {}", key)) + } + } } pub fn stop_providing_block(&mut self, cid: &Cid) { diff --git a/tests/kademlia.rs b/tests/kademlia.rs index dc67c4805..5001c6743 100644 --- a/tests/kademlia.rs +++ b/tests/kademlia.rs @@ -1,6 +1,10 @@ -use ipfs::Node; +use async_std::future::timeout; +use cid::{Cid, Codec}; +use ipfs::{Block, Node}; use libp2p::PeerId; use log::LevelFilter; +use multihash::Sha2_256; +use std::time::Duration; const PEER_COUNT: usize = 20; @@ -43,9 +47,30 @@ async fn kademlia() { .await .is_ok()); - // call kad::bootstrap + // check that kad::bootstrap works assert!(extra_peer.bootstrap().await.is_ok()); - // call kad::get_closest_peers + // check that kad::get_closest_peers works assert!(nodes[0].get_closest_peers().await.is_ok()); + + // add a Block to the extra peer + let data = Box::from(&b"hello block\n"[..]); + let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data)); + extra_peer.put_block(Block { + cid: cid.clone(), + data: data.clone(), + }) + .await + .unwrap(); + + async_std::task::spawn(async { async_std::task::sleep(Duration::from_secs(1)).await }).await; + + // add another peer, bootstrap it and try to get that Block + let extra_peer2 = Node::new(false).await; + assert!(extra_peer2 + .add_peer(peers[0].0.clone(), peers[0].1[0].clone()) + .await + .is_ok()); + + assert!(timeout(Duration::from_secs(10), extra_peer2.get_block(&cid)).await.is_ok()); } From 49390676bf22cf390cad571508356013a2936af4 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Jul 2020 14:15:53 +0200 Subject: [PATCH 2/6] feat: rename the variables in the kad test, add a new content discovery one Signed-off-by: ljedrz --- src/lib.rs | 2 +- src/p2p/behaviour.rs | 12 ++--- tests/kademlia.rs | 102 +++++++++++++++++++++++++------------------ 3 files changed, 68 insertions(+), 48 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ecbdd5f14..9b1d58508 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -781,7 +781,7 @@ impl Future for IpfsFuture { SwarmEvent::NewListenAddr(addr) => { self.complete_listening_address_adding(addr); } - _ => println!("{:?}", inner), + _ => trace!("{:?}", inner), } } diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 4304d78e9..f9de705f5 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -95,7 +95,11 @@ impl NetworkBehaviourEventProcess for Behaviour info!("kad: peer {} is close", peer); } } - GetProviders(Ok(GetProvidersOk { key, providers, closest_peers })) => { + GetProviders(Ok(GetProvidersOk { + key, + providers, + closest_peers, + })) => { let key = multibase::encode(Base::Base32Lower, key); if providers.is_empty() && closest_peers.is_empty() { warn!("kad: could not find a provider for {}", key); @@ -257,9 +261,7 @@ impl NetworkBehaviourEventProcess for Behaviour< BitswapEvent::ReceivedWant(peer_id, cid, priority) => { info!( "Peer {} wants block {} with priority {}", - peer_id, - cid, - priority + peer_id, cid, priority ); let queued_blocks = self.bitswap().queued_blocks.clone(); @@ -435,7 +437,7 @@ impl Behaviour { match self.kademlia.start_providing(key.clone().into()) { Ok(_id) => { // Ok(self.kad_subscriptions.create_subscription(id.into(), None)) - }, + } Err(e) => { error!("kad: can't provide block {}: {:?}", cid, e); // Err(anyhow!("kad: can't provide block {}", key)) diff --git a/tests/kademlia.rs b/tests/kademlia.rs index 5001c6743..8f819e832 100644 --- a/tests/kademlia.rs +++ b/tests/kademlia.rs @@ -1,76 +1,94 @@ use async_std::future::timeout; -use cid::{Cid, Codec}; -use ipfs::{Block, Node}; -use libp2p::PeerId; +use cid::Cid; +use ipfs::Node; +use libp2p::{Multiaddr, PeerId}; use log::LevelFilter; -use multihash::Sha2_256; use std::time::Duration; -const PEER_COUNT: usize = 20; - -#[async_std::test] -async fn kademlia() { - let _ = env_logger::builder() +fn init_test_logging() { + env_logger::builder() .is_test(true) .filter(Some("async_std"), LevelFilter::Error) - .init(); + .init() +} + +#[async_std::test] +async fn kademlia_local_peer_discovery() { + const BOOTSTRAPPER_COUNT: usize = 20; + + // set up logging + init_test_logging(); // start up PEER_COUNT bootstrapper nodes - let mut nodes = Vec::with_capacity(PEER_COUNT); - for _ in 0..PEER_COUNT { - nodes.push(Node::new().await); + let mut bootstrappers = Vec::with_capacity(BOOTSTRAPPER_COUNT); + for _ in 0..BOOTSTRAPPER_COUNT { + bootstrappers.push(Node::new().await); } // register the bootstrappers' ids and addresses - let mut peers = Vec::with_capacity(PEER_COUNT); - for node in &nodes { - let (id, addrs) = node.identity().await.unwrap(); + let mut bootstrapper_ids = Vec::with_capacity(BOOTSTRAPPER_COUNT); + for bootstrapper in &bootstrappers { + let (id, addrs) = bootstrapper.identity().await.unwrap(); let id = PeerId::from_public_key(id); - peers.push((id, addrs)); + bootstrapper_ids.push((id, addrs)); } // connect all the bootstrappers to one another - for (i, (node_id, _)) in peers.iter().enumerate() { - for (peer_id, addrs) in peers.iter().filter(|(peer_id, _)| peer_id != node_id) { - nodes[i] - .add_peer(peer_id.clone(), addrs[0].clone()) + for (i, (node_id, _)) in bootstrapper_ids.iter().enumerate() { + for (bootstrapper_id, addrs) in bootstrapper_ids + .iter() + .filter(|(peer_id, _)| peer_id != node_id) + { + bootstrappers[i] + .add_peer(bootstrapper_id.clone(), addrs[0].clone()) .await .unwrap(); } } - // introduce an extra peer and connect it to one of the bootstrappers - let extra_peer = Node::new().await; - assert!(extra_peer - .add_peer(peers[0].0.clone(), peers[0].1[0].clone()) + // introduce a peer and connect it to one of the bootstrappers + let peer = Node::new().await; + assert!(peer + .add_peer( + bootstrapper_ids[0].0.clone(), + bootstrapper_ids[0].1[0].clone() + ) .await .is_ok()); // check that kad::bootstrap works - assert!(extra_peer.bootstrap().await.is_ok()); + assert!(peer.bootstrap().await.is_ok()); // check that kad::get_closest_peers works - assert!(nodes[0].get_closest_peers().await.is_ok()); + assert!(peer.get_closest_peers().await.is_ok()); +} - // add a Block to the extra peer - let data = Box::from(&b"hello block\n"[..]); - let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data)); - extra_peer.put_block(Block { - cid: cid.clone(), - data: data.clone(), - }) - .await - .unwrap(); +#[async_std::test] +async fn kademlia_popular_content_discovery() { + // set up logging + init_test_logging(); - async_std::task::spawn(async { async_std::task::sleep(Duration::from_secs(1)).await }).await; + let (bootstrapper_id, bootstrapper_addr): (PeerId, Multiaddr) = ( + "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ" + .parse() + .unwrap(), + "/ip4/104.131.131.82/tcp/4001".parse().unwrap(), + ); - // add another peer, bootstrap it and try to get that Block - let extra_peer2 = Node::new(false).await; - assert!(extra_peer2 - .add_peer(peers[0].0.clone(), peers[0].1[0].clone()) + // introduce a peer and connect it to one of the well-known bootstrappers + let peer = Node::new().await; + assert!(peer + .add_peer(bootstrapper_id, bootstrapper_addr) .await .is_ok()); - assert!(timeout(Duration::from_secs(10), extra_peer2.get_block(&cid)).await.is_ok()); + // the Cid of the docs.ipfs.io website + let cid: Cid = "bafybeicfjz7woevc5dxvsskibxpxpofkrdjyslbggvvr3d66ddqu744nne" + .parse() + .unwrap(); + + assert!(timeout(Duration::from_secs(10), peer.get_block(&cid)) + .await + .is_ok()); } From e98e86b188d24798bae3786c2f1b1c2a4f802952 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Thu, 23 Jul 2020 10:46:57 +0200 Subject: [PATCH 3/6] fix: set up the kad protocol in the content discovery test Signed-off-by: ljedrz --- src/lib.rs | 4 ++++ tests/kademlia.rs | 11 ++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9b1d58508..71bfa968e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -965,6 +965,10 @@ mod node { impl Node { pub async fn new() -> Self { let opts = IpfsOptions::inmemory_with_generated_keys(); + Node::with_options(opts).await + } + + pub async fn with_options(opts: IpfsOptions) -> Self { let (ipfs, fut) = UninitializedIpfs::new(opts) .await .start() diff --git a/tests/kademlia.rs b/tests/kademlia.rs index 8f819e832..f5729bae9 100644 --- a/tests/kademlia.rs +++ b/tests/kademlia.rs @@ -1,6 +1,6 @@ use async_std::future::timeout; use cid::Cid; -use ipfs::Node; +use ipfs::{IpfsOptions, Node}; use libp2p::{Multiaddr, PeerId}; use log::LevelFilter; use std::time::Duration; @@ -76,8 +76,13 @@ async fn kademlia_popular_content_discovery() { "/ip4/104.131.131.82/tcp/4001".parse().unwrap(), ); - // introduce a peer and connect it to one of the well-known bootstrappers - let peer = Node::new().await; + // introduce a peer and specify the Kademlia protocol to it + // without a specified protocol, the test will not complete + let mut opts = IpfsOptions::inmemory_with_generated_keys(); + opts.kad_protocol = Some("/ipfs/lan/kad/1.0.0".to_owned()); + let peer = Node::with_options(opts).await; + + // connect it to one of the well-known bootstrappers assert!(peer .add_peer(bootstrapper_id, bootstrapper_addr) .await From 443bfad37279c1d23fa45a347dc516e277ad5868 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Thu, 23 Jul 2020 15:23:30 +0200 Subject: [PATCH 4/6] fix: remove a stray clone Signed-off-by: ljedrz --- src/p2p/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index f9de705f5..21b86f5d8 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -434,7 +434,7 @@ impl Behaviour { // Ipfs and IpfsFuture objects - it would currently require some extra back-and-forth pub fn provide_block(&mut self, cid: Cid) { let key = cid.to_bytes(); - match self.kademlia.start_providing(key.clone().into()) { + match self.kademlia.start_providing(key.into()) { Ok(_id) => { // Ok(self.kad_subscriptions.create_subscription(id.into(), None)) } From d0908104a3b2e772ce1233a637e47917c35cc76c Mon Sep 17 00:00:00 2001 From: ljedrz Date: Thu, 23 Jul 2020 15:44:15 +0200 Subject: [PATCH 5/6] fix: don't initialize the global test logger twice Signed-off-by: ljedrz --- tests/kademlia.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/kademlia.rs b/tests/kademlia.rs index f5729bae9..ea537ba8e 100644 --- a/tests/kademlia.rs +++ b/tests/kademlia.rs @@ -2,14 +2,14 @@ use async_std::future::timeout; use cid::Cid; use ipfs::{IpfsOptions, Node}; use libp2p::{Multiaddr, PeerId}; -use log::LevelFilter; +use log::{LevelFilter, SetLoggerError}; use std::time::Duration; -fn init_test_logging() { +fn init_test_logging() -> Result<(), SetLoggerError> { env_logger::builder() .is_test(true) .filter(Some("async_std"), LevelFilter::Error) - .init() + .try_init() } #[async_std::test] @@ -17,7 +17,7 @@ async fn kademlia_local_peer_discovery() { const BOOTSTRAPPER_COUNT: usize = 20; // set up logging - init_test_logging(); + let _ = init_test_logging(); // start up PEER_COUNT bootstrapper nodes let mut bootstrappers = Vec::with_capacity(BOOTSTRAPPER_COUNT); @@ -67,7 +67,7 @@ async fn kademlia_local_peer_discovery() { #[async_std::test] async fn kademlia_popular_content_discovery() { // set up logging - init_test_logging(); + let _ = init_test_logging(); let (bootstrapper_id, bootstrapper_addr): (PeerId, Multiaddr) = ( "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ" From a4be9cddd691aa4f45ee9be14c3ed291511373e2 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Tue, 28 Jul 2020 09:57:23 +0200 Subject: [PATCH 6/6] refactor: change the ipfs.docs Cid to the logo one Signed-off-by: ljedrz --- tests/kademlia.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kademlia.rs b/tests/kademlia.rs index ea537ba8e..bf87b1985 100644 --- a/tests/kademlia.rs +++ b/tests/kademlia.rs @@ -88,8 +88,8 @@ async fn kademlia_popular_content_discovery() { .await .is_ok()); - // the Cid of the docs.ipfs.io website - let cid: Cid = "bafybeicfjz7woevc5dxvsskibxpxpofkrdjyslbggvvr3d66ddqu744nne" + // the Cid of the IPFS logo + let cid: Cid = "bafkreicncneocapbypwwe3gl47bzvr3pkpxmmobzn7zr2iaz67df4kjeiq" .parse() .unwrap();