diff --git a/src/lib.rs b/src/lib.rs index 455ecf58e..71bfa968e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -777,8 +777,11 @@ impl Future for IpfsFuture { // exhaust the swarm before possibly causing the swarm to do more work by popping // off the events from Ipfs and ... this looping goes on for a while. done = false; - if let SwarmEvent::NewListenAddr(addr) = inner { - self.complete_listening_address_adding(addr); + match inner { + SwarmEvent::NewListenAddr(addr) => { + self.complete_listening_address_adding(addr); + } + _ => trace!("{:?}", inner), } } @@ -962,6 +965,10 @@ mod node { impl Node { pub async fn new() -> Self { let opts = IpfsOptions::inmemory_with_generated_keys(); + Node::with_options(opts).await + } + + pub async fn with_options(opts: IpfsOptions) -> Self { let (ipfs, fut) = UninitializedIpfs::new(opts) .await .start() diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 8224f59ae..21b86f5d8 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -95,41 +95,44 @@ impl NetworkBehaviourEventProcess for Behaviour info!("kad: peer {} is close", peer); } } - GetProviders(Ok(GetProvidersOk { key, providers, .. })) => { - let key = multibase::encode(Base::Base58Btc, key); - if providers.is_empty() { - // FIXME: not sure if this is possible + GetProviders(Ok(GetProvidersOk { + key, + providers, + closest_peers, + })) => { + let key = multibase::encode(Base::Base32Lower, key); + if providers.is_empty() && closest_peers.is_empty() { warn!("kad: could not find a provider for {}", key); } else { - for peer in providers { - info!("kad: {} provided by {}", key, peer); + for peer in closest_peers.into_iter().chain(providers.into_iter()) { + info!("kad: {} is provided by {}", key, peer); self.bitswap.connect(peer); } } } GetProviders(Err(GetProvidersError::Timeout { key, .. })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out trying to get providers for {}", key); } StartProviding(Ok(AddProviderOk { key })) => { - let key = multibase::encode(Base::Base58Btc, key); - info!("kad: added provider {}", key); + let key = multibase::encode(Base::Base32Lower, key); + info!("kad: providing {}", key); } StartProviding(Err(AddProviderError::Timeout { key })) => { - let key = multibase::encode(Base::Base58Btc, key); - warn!("kad: timed out trying to add provider {}", key); + let key = multibase::encode(Base::Base32Lower, key); + warn!("kad: timed out trying to provide {}", key); } RepublishProvider(Ok(AddProviderOk { key })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); info!("kad: republished provider {}", key); } RepublishProvider(Err(AddProviderError::Timeout { key })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out trying to republish provider {}", key); } GetRecord(Ok(GetRecordOk { records })) => { for record in records { - let key = multibase::encode(Base::Base58Btc, record.record.key); + let key = multibase::encode(Base::Base32Lower, record.record.key); info!("kad: got record {}:{:?}", key, record.record.value); } } @@ -137,7 +140,7 @@ impl NetworkBehaviourEventProcess for Behaviour key, closest_peers: _, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!("kad: couldn't find record {}", key); } GetRecord(Err(GetRecordError::QuorumFailed { @@ -145,14 +148,14 @@ impl NetworkBehaviourEventProcess for Behaviour records, quorum, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!( "kad: quorum failed {} trying to get key {}; got the following:", quorum, key ); for record in records { - let key = multibase::encode(Base::Base58Btc, record.record.key); + let key = multibase::encode(Base::Base32Lower, record.record.key); info!("kad: got record {}:{:?}", key, record.record.value); } } @@ -161,20 +164,20 @@ impl NetworkBehaviourEventProcess for Behaviour records, quorum: _, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); warn!( "kad: timed out trying to get key {}; got the following:", key ); for record in records { - let key = multibase::encode(Base::Base58Btc, record.record.key); + let key = multibase::encode(Base::Base32Lower, record.record.key); info!("kad: got record {}:{:?}", key, record.record.value); } } PutRecord(Ok(PutRecordOk { key })) | RepublishRecord(Ok(PutRecordOk { key })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); info!("kad: successfully put record {}", key); } PutRecord(Err(PutRecordError::QuorumFailed { @@ -187,7 +190,7 @@ impl NetworkBehaviourEventProcess for Behaviour success: _, quorum, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); info!( "kad: quorum failed ({}) trying to put record {}", quorum, key @@ -203,7 +206,7 @@ impl NetworkBehaviourEventProcess for Behaviour success: _, quorum: _, })) => { - let key = multibase::encode(Base::Base58Btc, key); + let key = multibase::encode(Base::Base32Lower, key); info!("kad: timed out trying to put record {}", key); } } @@ -216,7 +219,7 @@ impl NetworkBehaviourEventProcess for Behaviour trace!("kad: routing updated; {}: {:?}", peer, addresses); } UnroutablePeer { peer } => { - warn!("kad: peer {} is unroutable", peer); + trace!("kad: peer {} is unroutable", peer); } RoutablePeer { peer, address } => { trace!("kad: peer {} ({}) is routable", peer, address); @@ -258,9 +261,7 @@ impl NetworkBehaviourEventProcess for Behaviour< BitswapEvent::ReceivedWant(peer_id, cid, priority) => { info!( "Peer {} wants block {} with priority {}", - peer_id.to_base58(), - cid, - priority + peer_id, cid, priority ); let queued_blocks = self.bitswap().queued_blocks.clone(); @@ -419,16 +420,29 @@ impl Behaviour { self.swarm.disconnect(addr) } + // FIXME: it would be best if get_providers is called only in case the already connected + // peers don't have it pub fn want_block(&mut self, cid: Cid) { - //let hash = Multihash::from_bytes(cid.to_bytes()).unwrap(); - //self.kademlia.get_providers(hash); + let key = cid.to_bytes(); + self.kademlia.get_providers(key.into()); self.bitswap.want_block(cid, 1); } + // FIXME: it would probably be best if this could return a SubscriptionFuture, so + // that the put_block operation truly finishes only when the block is already being + // provided; it is, however, pretty tricky in terms of internal communication between + // Ipfs and IpfsFuture objects - it would currently require some extra back-and-forth pub fn provide_block(&mut self, cid: Cid) { - info!("Providing block {}", cid.to_string()); - //let hash = Multihash::from_bytes(cid.to_bytes()).unwrap(); - //self.kademlia.add_providing(PeerId::from_multihash(hash).unwrap()); + let key = cid.to_bytes(); + match self.kademlia.start_providing(key.into()) { + Ok(_id) => { + // Ok(self.kad_subscriptions.create_subscription(id.into(), None)) + } + Err(e) => { + error!("kad: can't provide block {}: {:?}", cid, e); + // Err(anyhow!("kad: can't provide block {}", key)) + } + } } pub fn stop_providing_block(&mut self, cid: &Cid) { diff --git a/tests/kademlia.rs b/tests/kademlia.rs index dc67c4805..bf87b1985 100644 --- a/tests/kademlia.rs +++ b/tests/kademlia.rs @@ -1,51 +1,99 @@ -use ipfs::Node; -use libp2p::PeerId; -use log::LevelFilter; +use async_std::future::timeout; +use cid::Cid; +use ipfs::{IpfsOptions, Node}; +use libp2p::{Multiaddr, PeerId}; +use log::{LevelFilter, SetLoggerError}; +use std::time::Duration; -const PEER_COUNT: usize = 20; - -#[async_std::test] -async fn kademlia() { - let _ = env_logger::builder() +fn init_test_logging() -> Result<(), SetLoggerError> { + env_logger::builder() .is_test(true) .filter(Some("async_std"), LevelFilter::Error) - .init(); + .try_init() +} + +#[async_std::test] +async fn kademlia_local_peer_discovery() { + const BOOTSTRAPPER_COUNT: usize = 20; + + // set up logging + let _ = init_test_logging(); // start up PEER_COUNT bootstrapper nodes - let mut nodes = Vec::with_capacity(PEER_COUNT); - for _ in 0..PEER_COUNT { - nodes.push(Node::new().await); + let mut bootstrappers = Vec::with_capacity(BOOTSTRAPPER_COUNT); + for _ in 0..BOOTSTRAPPER_COUNT { + bootstrappers.push(Node::new().await); } // register the bootstrappers' ids and addresses - let mut peers = Vec::with_capacity(PEER_COUNT); - for node in &nodes { - let (id, addrs) = node.identity().await.unwrap(); + let mut bootstrapper_ids = Vec::with_capacity(BOOTSTRAPPER_COUNT); + for bootstrapper in &bootstrappers { + let (id, addrs) = bootstrapper.identity().await.unwrap(); let id = PeerId::from_public_key(id); - peers.push((id, addrs)); + bootstrapper_ids.push((id, addrs)); } // connect all the bootstrappers to one another - for (i, (node_id, _)) in peers.iter().enumerate() { - for (peer_id, addrs) in peers.iter().filter(|(peer_id, _)| peer_id != node_id) { - nodes[i] - .add_peer(peer_id.clone(), addrs[0].clone()) + for (i, (node_id, _)) in bootstrapper_ids.iter().enumerate() { + for (bootstrapper_id, addrs) in bootstrapper_ids + .iter() + .filter(|(peer_id, _)| peer_id != node_id) + { + bootstrappers[i] + .add_peer(bootstrapper_id.clone(), addrs[0].clone()) .await .unwrap(); } } - // introduce an extra peer and connect it to one of the bootstrappers - let extra_peer = Node::new().await; - assert!(extra_peer - .add_peer(peers[0].0.clone(), peers[0].1[0].clone()) + // introduce a peer and connect it to one of the bootstrappers + let peer = Node::new().await; + assert!(peer + .add_peer( + bootstrapper_ids[0].0.clone(), + bootstrapper_ids[0].1[0].clone() + ) + .await + .is_ok()); + + // check that kad::bootstrap works + assert!(peer.bootstrap().await.is_ok()); + + // check that kad::get_closest_peers works + assert!(peer.get_closest_peers().await.is_ok()); +} + +#[async_std::test] +async fn kademlia_popular_content_discovery() { + // set up logging + let _ = init_test_logging(); + + let (bootstrapper_id, bootstrapper_addr): (PeerId, Multiaddr) = ( + "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ" + .parse() + .unwrap(), + "/ip4/104.131.131.82/tcp/4001".parse().unwrap(), + ); + + // introduce a peer and specify the Kademlia protocol to it + // without a specified protocol, the test will not complete + let mut opts = IpfsOptions::inmemory_with_generated_keys(); + opts.kad_protocol = Some("/ipfs/lan/kad/1.0.0".to_owned()); + let peer = Node::with_options(opts).await; + + // connect it to one of the well-known bootstrappers + assert!(peer + .add_peer(bootstrapper_id, bootstrapper_addr) .await .is_ok()); - // call kad::bootstrap - assert!(extra_peer.bootstrap().await.is_ok()); + // the Cid of the IPFS logo + let cid: Cid = "bafkreicncneocapbypwwe3gl47bzvr3pkpxmmobzn7zr2iaz67df4kjeiq" + .parse() + .unwrap(); - // call kad::get_closest_peers - assert!(nodes[0].get_closest_peers().await.is_ok()); + assert!(timeout(Duration::from_secs(10), peer.get_block(&cid)) + .await + .is_ok()); }