From 2b89ae9c4282e01727a12d8f91ce373f30d2c9db Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 16 Jun 2022 20:12:48 +0200 Subject: [PATCH] feat(kad): report get_providers call event based --- examples/distributed-key-value-store.rs | 19 ++- examples/file-sharing.rs | 30 +++- misc/metrics/src/kad.rs | 10 +- protocols/kad/src/behaviour.rs | 155 +++++++++--------- protocols/kad/src/behaviour/test.rs | 85 ++++++---- protocols/kad/src/lib.rs | 10 +- protocols/kad/src/query.rs | 12 ++ protocols/kad/src/query/peers/closest.rs | 13 ++ .../kad/src/query/peers/closest/disjoint.rs | 9 + protocols/kad/src/query/peers/fixed.rs | 10 ++ 10 files changed, 223 insertions(+), 130 deletions(-) diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 6bf28bf0ff92..ab106978660c 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -53,6 +53,7 @@ use libp2p::{ swarm::{NetworkBehaviourEventProcess, SwarmEvent}, NetworkBehaviour, PeerId, Swarm, }; +use libp2p_kad::{GetProvidersProgress, QueryProgress}; use std::error::Error; #[async_std::main] @@ -89,16 +90,16 @@ async fn main() -> Result<(), Box> { // Called when `kademlia` produces an event. fn inject_event(&mut self, message: KademliaEvent) { match message { - KademliaEvent::OutboundQueryCompleted { result, .. } => match result { - QueryResult::GetProviders(Ok(ok)) => { - for peer in ok.providers { - println!( - "Peer {:?} provides key {:?}", - peer, - std::str::from_utf8(ok.key.as_ref()).unwrap() - ); - } + KademliaEvent::OutboundQueryProgressed { result, .. } => match result { + QueryProgress::GetProviders(GetProvidersProgress { key, provider, .. }) => { + println!( + "Peer {:?} provides key {:?}", + provider, + std::str::from_utf8(key.as_ref()).unwrap() + ); } + }, + KademliaEvent::OutboundQueryCompleted { result, .. } => match result { QueryResult::GetProviders(Err(err)) => { eprintln!("Failed to get providers: {:?}", err); } diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 492eee11eeb0..bc4ec08217e0 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -211,7 +211,7 @@ mod network { use libp2p::identity; use libp2p::identity::ed25519; use libp2p::kad::record::store::MemoryStore; - use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult}; + use libp2p::kad::{Kademlia, KademliaEvent, QueryId, QueryResult}; use libp2p::multiaddr::Protocol; use libp2p::request_response::{ ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent, @@ -219,6 +219,7 @@ mod network { }; use libp2p::swarm::{ConnectionHandlerUpgrErr, SwarmBuilder, SwarmEvent}; use libp2p::{NetworkBehaviour, Swarm}; + use libp2p_kad::{GetProvidersProgress, QueryProgress}; use std::collections::{HashMap, HashSet}; use std::iter; @@ -324,12 +325,12 @@ mod network { /// Find the providers for the given file on the DHT. pub async fn get_providers(&mut self, file_name: String) -> HashSet { - let (sender, receiver) = oneshot::channel(); + let (sender, receiver) = mpsc::channel(0); self.sender .send(Command::GetProviders { file_name, sender }) .await .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") + receiver.collect().await } /// Request the content of the given file from the given peer. @@ -365,7 +366,7 @@ mod network { event_sender: mpsc::Sender, pending_dial: HashMap>>>, pending_start_providing: HashMap>, - pending_get_providers: HashMap>>, + pending_get_providers: HashMap>, pending_request_file: HashMap>>>, } @@ -421,18 +422,31 @@ mod network { .expect("Completed query to be previously pending."); let _ = sender.send(()); } + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryProgress::GetProviders(GetProvidersProgress { provider, .. }), + .. + }, + )) => { + let _ = self + .pending_get_providers + .get_mut(&id) + .expect("Completed query to be previously pending.") + .send(provider); + } SwarmEvent::Behaviour(ComposedEvent::Kademlia( KademliaEvent::OutboundQueryCompleted { id, - result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })), + result: QueryResult::GetProviders(..), .. }, )) => { + // Drop channel to signal query is complete. let _ = self .pending_get_providers .remove(&id) - .expect("Completed query to be previously pending.") - .send(providers); + .expect("Completed query to be previously pending."); } SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::RequestResponse( @@ -620,7 +634,7 @@ mod network { }, GetProviders { file_name: String, - sender: oneshot::Sender>, + sender: mpsc::Sender, }, RequestFile { file_name: String, diff --git a/misc/metrics/src/kad.rs b/misc/metrics/src/kad.rs index 8ab71befe91b..518ad40eef49 100644 --- a/misc/metrics/src/kad.rs +++ b/misc/metrics/src/kad.rs @@ -208,10 +208,7 @@ impl super::Recorder for super::Metrics { } }, libp2p_kad::QueryResult::GetProviders(result) => match result { - Ok(ok) => self - .kad - .query_result_get_providers_ok - .observe(ok.providers.len() as f64), + Ok(_) => {} Err(error) => { self.kad .query_result_get_providers_error @@ -222,6 +219,11 @@ impl super::Recorder for super::Metrics { _ => {} } } + libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, .. } => match result { + libp2p_kad::QueryProgress::GetProviders(_) => { + self.kad.query_result_get_providers_ok.observe(1.); + } + }, libp2p_kad::KademliaEvent::RoutingUpdated { is_new_peer, old_peer, diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index b6a69aab9c59..3fb015ce2bb4 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -919,28 +919,41 @@ where /// /// The result of this operation is delivered in a /// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetProviders}`]. - pub fn get_providers(&mut self, key: record::Key, limit: ProviderLimit) -> QueryId { - let providers = self + pub fn get_providers(&mut self, key: record::Key) -> QueryId { + let providers: HashSet<_> = self .store .providers(&key) .into_iter() .filter(|p| !p.is_expired(Instant::now())) - .map(|p| p.provider); - - let providers = match limit { - ProviderLimit::None => providers.collect(), - ProviderLimit::N(limit) => providers.take(limit.into()).collect(), - }; + .map(|p| p.provider) + .collect(); let info = QueryInfo::GetProviders { key: key.clone(), - providers, - limit, + providers_found: providers.len(), }; - let target = kbucket::Key::new(key); + + let target = kbucket::Key::new(key.clone()); let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner) + let id = self.queries.add_iter_closest(target.clone(), peers, inner); + + for (i, provider) in providers.into_iter().enumerate() { + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryProgress::GetProviders(GetProvidersProgress { + key: key.clone(), + provider, + closest_peers: Default::default(), + }), + count: i, + }, + )); + } + + id } /// Processes discovered peers from a successful request in an iterative `Query`. @@ -1264,18 +1277,10 @@ where })), }), - QueryInfo::GetProviders { - key, - providers, - limit: _, - } => Some(KademliaEvent::OutboundQueryCompleted { + QueryInfo::GetProviders { key, .. } => Some(KademliaEvent::OutboundQueryCompleted { id: query_id, stats: result.stats, - result: QueryResult::GetProviders(Ok(GetProvidersOk { - key, - providers, - closest_peers: result.peers.collect(), - })), + result: QueryResult::GetProviders(Ok(GetProvidersOk { key })), }), QueryInfo::AddProvider { @@ -1561,16 +1566,11 @@ where })), }), - QueryInfo::GetProviders { - key, - providers, - limit: _, - } => Some(KademliaEvent::OutboundQueryCompleted { + QueryInfo::GetProviders { key, .. } => Some(KademliaEvent::OutboundQueryCompleted { id: query_id, stats: result.stats, result: QueryResult::GetProviders(Err(GetProvidersError::Timeout { key, - providers, closest_peers: result.peers.collect(), })), }), @@ -2067,9 +2067,31 @@ where let peers = closer_peers.iter().chain(provider_peers.iter()); self.discovered(&user_data, &source, peers); if let Some(query) = self.queries.get_mut(&user_data) { - if let QueryInfo::GetProviders { providers, .. } = &mut query.inner.info { + let closest_peers: Vec<_> = query.as_intermediary_result().collect(); + if let QueryInfo::GetProviders { + ref key, + ref mut providers_found, + .. + } = query.inner.info + { for peer in provider_peers { - providers.insert(peer.node_id); + let key = key.clone(); + let provider = peer.node_id; + let count = *providers_found; + *providers_found += 1; + + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id: user_data, + result: QueryProgress::GetProviders(GetProvidersProgress { + key, + provider, + closest_peers: closest_peers.clone(), + }), + count, + }, + )); } } } @@ -2342,30 +2364,6 @@ where query.on_success(&peer_id, vec![]) } - if let QueryInfo::GetProviders { - key: _, - providers, - limit, - } = &query.inner.info - { - match limit { - ProviderLimit::None => { - // No limit, so wait for enough peers to respond. - } - ProviderLimit::N(n) => { - // Check if we have enough providers. - if usize::from(*n) <= providers.len() { - debug!( - "found enough providers {}/{}, finishing", - providers.len(), - n - ); - query.finish(); - } - } - } - } - if self.connected_peers.contains(&peer_id) { self.queued_events .push_back(NetworkBehaviourAction::NotifyHandler { @@ -2398,15 +2396,6 @@ where } } -/// Specifies the number of provider records fetched. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ProviderLimit { - /// No limit on the number of records. - None, - /// Finishes the query as soon as this many records have been found. - N(NonZeroUsize), -} - /// A quorum w.r.t. the configured replication factor specifies the minimum /// number of distinct nodes that must be successfully contacted in order /// for a query to succeed. @@ -2455,16 +2444,26 @@ pub enum KademliaEvent { // is made of multiple requests across multiple remote peers. InboundRequest { request: InboundRequest }, - /// An outbound query has produced a result. + /// An outbound query has finished. OutboundQueryCompleted { /// The ID of the query that finished. id: QueryId, - /// The result of the query. + /// The optional final result of the query. result: QueryResult, /// Execution statistics from the query. stats: QueryStats, }, + /// An outbound query has produced a result. + OutboundQueryProgressed { + /// The ID of the query that progressed. + id: QueryId, + /// The intermediary result of the query. + result: QueryProgress, + /// Number of how many of these have already been emitted for this query. + count: usize, + }, + /// The routing table has been updated with a new peer and / or /// address, thereby possibly evicting another peer. RoutingUpdated { @@ -2577,6 +2576,13 @@ pub enum QueryResult { RepublishRecord(PutRecordResult), } +/// The results of Kademlia queries. +#[derive(Debug, Clone)] +pub enum QueryProgress { + /// The result of [`Kademlia::get_providers`]. + GetProviders(GetProvidersProgress), +} + /// The result of [`Kademlia::get_record`]. pub type GetRecordResult = Result; @@ -2744,21 +2750,26 @@ impl GetClosestPeersError { /// The result of [`Kademlia::get_providers`]. pub type GetProvidersResult = Result; -/// The successful result of [`Kademlia::get_providers`]. +/// The successful progress result of [`Kademlia::get_providers`]. #[derive(Debug, Clone)] -pub struct GetProvidersOk { +pub struct GetProvidersProgress { pub key: record::Key, - pub providers: HashSet, + pub provider: PeerId, pub closest_peers: Vec, } +/// The final successful result of [`Kademlia::get_providers`]. +#[derive(Debug, Clone)] +pub struct GetProvidersOk { + pub key: record::Key, +} + /// The error result of [`Kademlia::get_providers`]. #[derive(Debug, Clone, Error)] pub enum GetProvidersError { #[error("the request timed out")] Timeout { key: record::Key, - providers: HashSet, closest_peers: Vec, }, } @@ -2904,10 +2915,8 @@ pub enum QueryInfo { GetProviders { /// The key for which to search for providers. key: record::Key, - /// The found providers. - providers: HashSet, - /// The limit of how many providers to find, - limit: ProviderLimit, + /// The number of providers already found. + providers_found: usize, }, /// A (repeated) query initiated by [`Kademlia::start_providing`]. diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 6da316bf47bc..61fc5adf80a7 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -1352,30 +1352,34 @@ fn get_providers_single() { } }); - let query_id = single_swarm - .behaviour_mut() - .get_providers(key.clone(), ProviderLimit::None); + let query_id = single_swarm.behaviour_mut().get_providers(key.clone()); + let mut found_key = None; block_on(async { - match single_swarm.next().await.unwrap() { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { - id, - result: - QueryResult::GetProviders(Ok(GetProvidersOk { - key: found_key, - providers, - .. - })), - .. - }) if id == query_id => { - assert_eq!(key, found_key); - assert_eq!( - single_swarm.local_peer_id(), - providers.iter().next().unwrap() - ); + loop { + match single_swarm.next().await.unwrap() { + SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { + id, + result: QueryResult::GetProviders(done), + .. + }) if id == query_id => { + assert_eq!(key, done.unwrap().key); + assert_eq!(key, found_key.unwrap()); + break; + } + SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { + id, + result: + QueryProgress::GetProviders(GetProvidersProgress { key, provider, .. }), + .. + }) if id == query_id => { + found_key = Some(key); + + assert_eq!(single_swarm.local_peer_id(), &provider); + } + SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e), + _ => {} } - SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e), - _ => {} } }); } @@ -1410,31 +1414,50 @@ fn get_providers_limit() { } // Query with expecting a single provider. - let query_id = swarms[0] - .behaviour_mut() - .get_providers(key.clone(), ProviderLimit::N(N.try_into().unwrap())); + let query_id = swarms[0].behaviour_mut().get_providers(key.clone()); + + let mut providers = vec![]; block_on(poll_fn(move |ctx| { for (i, swarm) in swarms.iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: - QueryResult::GetProviders(Ok(GetProvidersOk { + QueryProgress::GetProviders(GetProvidersProgress { key: found_key, - providers, + provider, .. - })), - .. + }), + count, }, ))) if i == 0 && id == query_id => { // There are a total of 2 providers. - assert_eq!(providers.len(), std::cmp::min(N, 2)); + assert_eq!(providers.len(), count); + providers.push(provider); assert_eq!(key, found_key); // Providers should be either 2 or 3 - assert_ne!(swarm.local_peer_id(), providers.iter().next().unwrap()); + assert_ne!(swarm.local_peer_id(), &provider); + + // If we have all providers, finish. + if providers.len() == N { + swarm.behaviour_mut().query_mut(&id).unwrap().finish(); + } + + return Poll::Ready(()); + } + Poll::Ready(Some(SwarmEvent::Behaviour( + KademliaEvent::OutboundQueryCompleted { + id, + result: + QueryResult::GetProviders(Ok(GetProvidersOk { key: found_key })), + .. + }, + ))) if i == 0 && id == query_id => { + assert_eq!(key, found_key); + assert_eq!(providers.len(), std::cmp::min(N, 2)); return Poll::Ready(()); } Poll::Ready(..) => {} diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 0e0374b1d387..910cf47b5396 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -60,14 +60,14 @@ pub use addresses::Addresses; pub use behaviour::{ AddProviderContext, AddProviderError, AddProviderOk, AddProviderPhase, AddProviderResult, BootstrapError, BootstrapOk, BootstrapResult, GetClosestPeersError, GetClosestPeersOk, - GetClosestPeersResult, GetProvidersError, GetProvidersOk, GetProvidersResult, GetRecordError, - GetRecordOk, GetRecordResult, InboundRequest, PeerRecord, PutRecordContext, PutRecordError, - PutRecordOk, PutRecordPhase, PutRecordResult, QueryInfo, QueryMut, QueryRef, QueryResult, - QueryStats, + GetClosestPeersResult, GetProvidersError, GetProvidersOk, GetProvidersProgress, + GetProvidersResult, GetRecordError, GetRecordOk, GetRecordResult, InboundRequest, PeerRecord, + PutRecordContext, PutRecordError, PutRecordOk, PutRecordPhase, PutRecordResult, QueryInfo, + QueryMut, QueryProgress, QueryRef, QueryResult, QueryStats, }; pub use behaviour::{ Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent, - KademliaStoreInserts, ProviderLimit, Quorum, + KademliaStoreInserts, Quorum, }; pub use protocol::KadConnectionType; pub use query::QueryId; diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 708c758464f1..0727a95d7e66 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -419,6 +419,18 @@ impl Query { stats: self.stats, } } + + pub fn as_intermediary_result(&self) -> impl Iterator + '_ { + match self.peer_iter { + QueryPeerIter::Closest(ref iter) => { + Either::Left(Either::Left(iter.as_intermediary_result())) + } + QueryPeerIter::ClosestDisjoint(ref iter) => { + Either::Left(Either::Right(iter.as_intermediary_result())) + } + QueryPeerIter::Fixed(ref iter) => Either::Right(iter.as_intermediary_result()), + } + } } /// The result of a `Query`. diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index 2b3cb1242740..da20854a7422 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -365,6 +365,19 @@ impl ClosestPeersIter { self.state == State::Finished } + pub fn as_intermediary_result(&self) -> impl Iterator + '_ { + self.closest_peers + .iter() + .filter_map(|(_, peer)| { + if let PeerState::Succeeded = peer.state { + Some(peer.key.clone().into_preimage()) + } else { + None + } + }) + .take(self.config.num_results.get()) + } + /// Consumes the iterator, returning the closest peers. pub fn into_result(self) -> impl Iterator { self.closest_peers diff --git a/protocols/kad/src/query/peers/closest/disjoint.rs b/protocols/kad/src/query/peers/closest/disjoint.rs index af91b8c1f0b5..a6b2aadf624a 100644 --- a/protocols/kad/src/query/peers/closest/disjoint.rs +++ b/protocols/kad/src/query/peers/closest/disjoint.rs @@ -331,6 +331,15 @@ impl ClosestDisjointPeersIter { ResultIter::new(self.target, result_per_path).map(Key::into_preimage) } + + pub fn as_intermediary_result(&self) -> impl Iterator + '_ { + let result_per_path = self + .iters + .iter() + .map(|iter| iter.clone().into_result().map(Key::from)); + + ResultIter::new(self.target.clone(), result_per_path).map(Key::into_preimage) + } } /// Index into the [`ClosestDisjointPeersIter`] `iters` vector. diff --git a/protocols/kad/src/query/peers/fixed.rs b/protocols/kad/src/query/peers/fixed.rs index 022a3de6f9f7..7afa08935c0c 100644 --- a/protocols/kad/src/query/peers/fixed.rs +++ b/protocols/kad/src/query/peers/fixed.rs @@ -170,6 +170,16 @@ impl FixedPeersIter { } }) } + + pub fn as_intermediary_result(&self) -> impl Iterator + '_ { + self.peers.iter().filter_map(|(p, s)| { + if let PeerState::Succeeded = s { + Some(*p) + } else { + None + } + }) + } } #[cfg(test)]