Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Initial content discovery #260

Merged
merged 6 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,11 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
// exhaust the swarm before possibly causing the swarm to do more work by popping
// off the events from Ipfs and ... this looping goes on for a while.
done = false;
if let SwarmEvent::NewListenAddr(addr) = inner {
self.complete_listening_address_adding(addr);
match inner {
SwarmEvent::NewListenAddr(addr) => {
self.complete_listening_address_adding(addr);
}
_ => trace!("{:?}", inner),
}
}

Expand Down Expand Up @@ -962,6 +965,10 @@ mod node {
impl Node {
pub async fn new() -> Self {
let opts = IpfsOptions::inmemory_with_generated_keys();
Node::with_options(opts).await
}

pub async fn with_options(opts: IpfsOptions<TestTypes>) -> Self {
let (ipfs, fut) = UninitializedIpfs::new(opts)
.await
.start()
Expand Down
76 changes: 45 additions & 31 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,64 +95,67 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
info!("kad: peer {} is close", peer);
}
}
GetProviders(Ok(GetProvidersOk { key, providers, .. })) => {
let key = multibase::encode(Base::Base58Btc, key);
if providers.is_empty() {
// FIXME: not sure if this is possible
GetProviders(Ok(GetProvidersOk {
key,
providers,
closest_peers,
})) => {
let key = multibase::encode(Base::Base32Lower, key);
if providers.is_empty() && closest_peers.is_empty() {
warn!("kad: could not find a provider for {}", key);
} else {
for peer in providers {
info!("kad: {} provided by {}", key, peer);
for peer in closest_peers.into_iter().chain(providers.into_iter()) {
info!("kad: {} is provided by {}", key, peer);
self.bitswap.connect(peer);
}
}
}
GetProviders(Err(GetProvidersError::Timeout { key, .. })) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out trying to get providers for {}", key);
}
StartProviding(Ok(AddProviderOk { key })) => {
let key = multibase::encode(Base::Base58Btc, key);
info!("kad: added provider {}", key);
let key = multibase::encode(Base::Base32Lower, key);
info!("kad: providing {}", key);
}
StartProviding(Err(AddProviderError::Timeout { key })) => {
let key = multibase::encode(Base::Base58Btc, key);
warn!("kad: timed out trying to add provider {}", key);
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out trying to provide {}", key);
}
RepublishProvider(Ok(AddProviderOk { key })) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);
info!("kad: republished provider {}", key);
}
RepublishProvider(Err(AddProviderError::Timeout { key })) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out trying to republish provider {}", key);
}
GetRecord(Ok(GetRecordOk { records })) => {
for record in records {
let key = multibase::encode(Base::Base58Btc, record.record.key);
let key = multibase::encode(Base::Base32Lower, record.record.key);
info!("kad: got record {}:{:?}", key, record.record.value);
}
}
GetRecord(Err(GetRecordError::NotFound {
key,
closest_peers: _,
})) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: couldn't find record {}", key);
}
GetRecord(Err(GetRecordError::QuorumFailed {
key,
records,
quorum,
})) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);

warn!(
"kad: quorum failed {} trying to get key {}; got the following:",
quorum, key
);
for record in records {
let key = multibase::encode(Base::Base58Btc, record.record.key);
let key = multibase::encode(Base::Base32Lower, record.record.key);
info!("kad: got record {}:{:?}", key, record.record.value);
}
}
Expand All @@ -161,20 +164,20 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
records,
quorum: _,
})) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);

warn!(
"kad: timed out trying to get key {}; got the following:",
key
);
for record in records {
let key = multibase::encode(Base::Base58Btc, record.record.key);
let key = multibase::encode(Base::Base32Lower, record.record.key);
info!("kad: got record {}:{:?}", key, record.record.value);
}
}
PutRecord(Ok(PutRecordOk { key }))
| RepublishRecord(Ok(PutRecordOk { key })) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);
info!("kad: successfully put record {}", key);
}
PutRecord(Err(PutRecordError::QuorumFailed {
Expand All @@ -187,7 +190,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
success: _,
quorum,
})) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);
info!(
"kad: quorum failed ({}) trying to put record {}",
quorum, key
Expand All @@ -203,7 +206,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
success: _,
quorum: _,
})) => {
let key = multibase::encode(Base::Base58Btc, key);
let key = multibase::encode(Base::Base32Lower, key);
info!("kad: timed out trying to put record {}", key);
}
}
Expand All @@ -216,7 +219,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
trace!("kad: routing updated; {}: {:?}", peer, addresses);
}
UnroutablePeer { peer } => {
warn!("kad: peer {} is unroutable", peer);
trace!("kad: peer {} is unroutable", peer);
}
RoutablePeer { peer, address } => {
trace!("kad: peer {} ({}) is routable", peer, address);
Expand Down Expand Up @@ -258,9 +261,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<BitswapEvent> for Behaviour<
BitswapEvent::ReceivedWant(peer_id, cid, priority) => {
info!(
"Peer {} wants block {} with priority {}",
peer_id.to_base58(),
cid,
priority
peer_id, cid, priority
);

let queued_blocks = self.bitswap().queued_blocks.clone();
Expand Down Expand Up @@ -419,16 +420,29 @@ impl<Types: IpfsTypes> Behaviour<Types> {
self.swarm.disconnect(addr)
}

// FIXME: it would be best if get_providers is called only in case the already connected
// peers don't have it
pub fn want_block(&mut self, cid: Cid) {
//let hash = Multihash::from_bytes(cid.to_bytes()).unwrap();
//self.kademlia.get_providers(hash);
let key = cid.to_bytes();
self.kademlia.get_providers(key.into());
self.bitswap.want_block(cid, 1);
}

// FIXME: it would probably be best if this could return a SubscriptionFuture, so
// that the put_block operation truly finishes only when the block is already being
// provided; it is, however, pretty tricky in terms of internal communication between
// Ipfs and IpfsFuture objects - it would currently require some extra back-and-forth
pub fn provide_block(&mut self, cid: Cid) {
Comment on lines +431 to 435
Copy link
Collaborator

@koivunej koivunej Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% with you on this FIXME but no need to change it now. To elaborate:

"Not 100% with you" as in is that the saving blocks should be as decoupled as possible and providing should only happen for some subset of blocks: possibly only the pinned roots, or pinned roots and their subroots or anything more complex would need to be provided for. Where "subroots" would be the roots of the files in a pinned larger directory structure for the invidiual files.

Decoupling providing and put_block because providing can be quite expensive, and we might not want to do them concurrently at all. Additionally not sure if it's a good idea to plan to use start_providing unless we can come up with a really nice backend for the kademlia store. Otherwise we could just use the simpler "push" API and handle the timing ourselves.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough 👍

info!("Providing block {}", cid.to_string());
//let hash = Multihash::from_bytes(cid.to_bytes()).unwrap();
//self.kademlia.add_providing(PeerId::from_multihash(hash).unwrap());
let key = cid.to_bytes();
match self.kademlia.start_providing(key.into()) {
Ok(_id) => {
// Ok(self.kad_subscriptions.create_subscription(id.into(), None))
}
Err(e) => {
error!("kad: can't provide block {}: {:?}", cid, e);
// Err(anyhow!("kad: can't provide block {}", key))
}
}
}

pub fn stop_providing_block(&mut self, cid: &Cid) {
Expand Down
104 changes: 76 additions & 28 deletions tests/kademlia.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,99 @@
use ipfs::Node;
use libp2p::PeerId;
use log::LevelFilter;
use async_std::future::timeout;
use cid::Cid;
use ipfs::{IpfsOptions, Node};
use libp2p::{Multiaddr, PeerId};
use log::{LevelFilter, SetLoggerError};
use std::time::Duration;

const PEER_COUNT: usize = 20;

#[async_std::test]
async fn kademlia() {
let _ = env_logger::builder()
fn init_test_logging() -> Result<(), SetLoggerError> {
env_logger::builder()
.is_test(true)
.filter(Some("async_std"), LevelFilter::Error)
.init();
.try_init()
}

#[async_std::test]
async fn kademlia_local_peer_discovery() {
const BOOTSTRAPPER_COUNT: usize = 20;

// set up logging
let _ = init_test_logging();

// start up PEER_COUNT bootstrapper nodes
let mut nodes = Vec::with_capacity(PEER_COUNT);
for _ in 0..PEER_COUNT {
nodes.push(Node::new().await);
let mut bootstrappers = Vec::with_capacity(BOOTSTRAPPER_COUNT);
for _ in 0..BOOTSTRAPPER_COUNT {
bootstrappers.push(Node::new().await);
}

// register the bootstrappers' ids and addresses
let mut peers = Vec::with_capacity(PEER_COUNT);
for node in &nodes {
let (id, addrs) = node.identity().await.unwrap();
let mut bootstrapper_ids = Vec::with_capacity(BOOTSTRAPPER_COUNT);
for bootstrapper in &bootstrappers {
let (id, addrs) = bootstrapper.identity().await.unwrap();
let id = PeerId::from_public_key(id);

peers.push((id, addrs));
bootstrapper_ids.push((id, addrs));
}

// connect all the bootstrappers to one another
for (i, (node_id, _)) in peers.iter().enumerate() {
for (peer_id, addrs) in peers.iter().filter(|(peer_id, _)| peer_id != node_id) {
nodes[i]
.add_peer(peer_id.clone(), addrs[0].clone())
for (i, (node_id, _)) in bootstrapper_ids.iter().enumerate() {
for (bootstrapper_id, addrs) in bootstrapper_ids
.iter()
.filter(|(peer_id, _)| peer_id != node_id)
{
bootstrappers[i]
.add_peer(bootstrapper_id.clone(), addrs[0].clone())
.await
.unwrap();
}
}

// introduce an extra peer and connect it to one of the bootstrappers
let extra_peer = Node::new().await;
assert!(extra_peer
.add_peer(peers[0].0.clone(), peers[0].1[0].clone())
// introduce a peer and connect it to one of the bootstrappers
let peer = Node::new().await;
assert!(peer
.add_peer(
bootstrapper_ids[0].0.clone(),
bootstrapper_ids[0].1[0].clone()
)
.await
.is_ok());

// check that kad::bootstrap works
assert!(peer.bootstrap().await.is_ok());

// check that kad::get_closest_peers works
assert!(peer.get_closest_peers().await.is_ok());
}

#[async_std::test]
async fn kademlia_popular_content_discovery() {
// set up logging
let _ = init_test_logging();

let (bootstrapper_id, bootstrapper_addr): (PeerId, Multiaddr) = (
"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"
.parse()
.unwrap(),
"/ip4/104.131.131.82/tcp/4001".parse().unwrap(),
);

// introduce a peer and specify the Kademlia protocol to it
// without a specified protocol, the test will not complete
let mut opts = IpfsOptions::inmemory_with_generated_keys();
opts.kad_protocol = Some("/ipfs/lan/kad/1.0.0".to_owned());
let peer = Node::with_options(opts).await;

// connect it to one of the well-known bootstrappers
assert!(peer
.add_peer(bootstrapper_id, bootstrapper_addr)
.await
.is_ok());

// call kad::bootstrap
assert!(extra_peer.bootstrap().await.is_ok());
// the Cid of the IPFS logo
let cid: Cid = "bafkreicncneocapbypwwe3gl47bzvr3pkpxmmobzn7zr2iaz67df4kjeiq"
.parse()
.unwrap();

// call kad::get_closest_peers
assert!(nodes[0].get_closest_peers().await.is_ok());
assert!(timeout(Duration::from_secs(10), peer.get_block(&cid))
.await
.is_ok());
}