diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 1482d841ebe..bd5f4be916b 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -52,6 +52,7 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmEvent}, PeerId, Swarm, }; +use libp2p_kad::{GetProvidersOk, GetRecordOk}; use std::error::Error; #[async_std::main] @@ -120,33 +121,32 @@ async fn main() -> Result<(), Box> { swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); } } - SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => { + SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => { match result { - QueryResult::GetProviders(Ok(ok)) => { - for peer in ok.providers { + QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key, providers, .. })) => { + for peer in providers { println!( - "Peer {:?} provides key {:?}", - peer, - std::str::from_utf8(ok.key.as_ref()).unwrap() + "Peer {peer:?} provides key {:?}", + std::str::from_utf8(key.as_ref()).unwrap() ); } } QueryResult::GetProviders(Err(err)) => { eprintln!("Failed to get providers: {err:?}"); } - QueryResult::GetRecord(Ok(ok)) => { - for PeerRecord { + QueryResult::GetRecord(Ok( + GetRecordOk::FoundRecord(PeerRecord { record: Record { key, value, .. }, .. - } in ok.records - { - println!( - "Got record {:?} {:?}", - std::str::from_utf8(key.as_ref()).unwrap(), - std::str::from_utf8(&value).unwrap(), - ); - } + }) + )) => { + println!( + "Got record {:?} {:?}", + std::str::from_utf8(key.as_ref()).unwrap(), + std::str::from_utf8(&value).unwrap(), + ); } + QueryResult::GetRecord(Ok(_)) => {} QueryResult::GetRecord(Err(err)) => { eprintln!("Failed to get record: {err:?}"); } @@ -191,7 +191,7 @@ fn handle_input_line(kademlia: &mut Kademlia, line: String) { } } }; - kademlia.get_record(key, Quorum::One); + kademlia.get_record(key); } Some("GET_PROVIDERS") => { let key = { diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 620ce6cd5d9..ef7d1fdf57d 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -413,7 +413,7 @@ mod network { ) { match event { SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::StartProviding(_), .. @@ -426,18 +426,37 @@ mod network { let _ = sender.send(()); } SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, - result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })), + result: + QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { + providers, + .. + })), .. }, )) => { - let _ = self - .pending_get_providers - .remove(&id) - .expect("Completed query to be previously pending.") - .send(providers); + if let Some(sender) = self.pending_get_providers.remove(&id) { + sender.send(providers).expect("Receiver not to be dropped"); + + // Finish the query. We are only interested in the first result. + self.swarm + .behaviour_mut() + .kademlia + .query_mut(&id) + .unwrap() + .finish(); + } } + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryProgressed { + result: + QueryResult::GetProviders(Ok( + GetProvidersOk::FinishedWithNoAdditionalRecord { .. }, + )), + .. + }, + )) => {} SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::RequestResponse( RequestResponseEvent::Message { message, .. }, diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 3764f8c9ad6..e088bba660d 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -81,7 +81,7 @@ async fn main() -> Result<(), Box> { loop { let event = swarm.select_next_some().await; - if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { + if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(result), .. }) = event diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index e321495e3d6..31beaa1227c 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -20,8 +20,12 @@ - Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090]. +- Changed `Metrics::query_result_get_record_ok` from `Histogram` to a `Counter`. + See [PR 2712]. + [PR 2982]: https://github.com/libp2p/rust-libp2p/pull/2982/ [PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 +[PR 2712]: https://github.com/libp2p/rust-libp2p/pull/2712 # 0.10.0 diff --git a/misc/metrics/src/kad.rs b/misc/metrics/src/kad.rs index 5e5a1056060..6d10e5b1318 100644 --- a/misc/metrics/src/kad.rs +++ b/misc/metrics/src/kad.rs @@ -25,7 +25,7 @@ use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; use prometheus_client::registry::{Registry, Unit}; pub struct Metrics { - query_result_get_record_ok: Histogram, + query_result_get_record_ok: Counter, query_result_get_record_error: Family, query_result_get_closest_peers_ok: Histogram, @@ -48,7 +48,7 @@ impl Metrics { pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("kad"); - let query_result_get_record_ok = Histogram::new(exponential_buckets(1.0, 2.0, 10)); + let query_result_get_record_ok = Counter::default(); sub_registry.register( "query_result_get_record_ok", "Number of records returned by a successful Kademlia get record query.", @@ -162,7 +162,7 @@ impl Metrics { impl super::Recorder for Metrics { fn record(&self, event: &libp2p_kad::KademliaEvent) { match event { - libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => { + libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => { self.query_result_num_requests .get_or_create(&result.into()) .observe(stats.num_requests().into()); @@ -180,9 +180,10 @@ impl super::Recorder for Metrics { match result { libp2p_kad::QueryResult::GetRecord(result) => match result { - Ok(ok) => self - .query_result_get_record_ok - .observe(ok.records.len() as f64), + Ok(libp2p_kad::GetRecordOk::FoundRecord(_)) => { + self.query_result_get_record_ok.inc(); + } + Ok(libp2p_kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {} Err(error) => { self.query_result_get_record_error .get_or_create(&error.into()) @@ -200,9 +201,13 @@ impl super::Recorder for Metrics { } }, libp2p_kad::QueryResult::GetProviders(result) => match result { - Ok(ok) => self - .query_result_get_providers_ok - .observe(ok.providers.len() as f64), + Ok(libp2p_kad::GetProvidersOk::FoundProviders { providers, .. }) => { + self.query_result_get_providers_ok + .observe(providers.len() as f64); + } + Ok(libp2p_kad::GetProvidersOk::FinishedWithNoAdditionalRecord { + .. + }) => {} Err(error) => { self.query_result_get_providers_error .get_or_create(&error.into()) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index e4e29259374..4d90652aa87 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -16,10 +16,18 @@ This would eventually lead to warning that says: "New inbound substream to PeerId exceeds inbound substream limit. No older substream waiting to be reused." See [PR 3152]. +- Refactor APIs to be streaming. + - Renamed `KademliaEvent::OutboundQueryCompleted` to `KademliaEvent::OutboundQueryProgressed` + - Instead of a single event `OutboundQueryCompleted`, there are now multiple events emitted, allowing the user to process them as they come in (via the new `OutboundQueryProgressed`). See `ProgressStep` to identify the final `OutboundQueryProgressed` of a single query. + - To finish a query early, i.e. before the final `OutboundQueryProgressed` of the query, a caller needs to call `query.finish()`. + - There is no more automatic caching of records. The user has to manually call `put_record_to` on the `QueryInfo::GetRecord.cache_candidates` to cache a record to a close peer that did not return the record on the foregone query. + See [PR 2712]. + [PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 [PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 [PR 3152]: https://github.com/libp2p/rust-libp2p/pull/3152 +[PR 2712]: https://github.com/libp2p/rust-libp2p/pull/2712 # 0.41.0 diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index b113deb64d5..643f618567c 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -179,23 +179,6 @@ pub struct KademliaConfig { caching: KademliaCaching, } -/// The configuration for Kademlia "write-back" caching after successful -/// lookups via [`Kademlia::get_record`]. -#[derive(Debug, Clone)] -pub enum KademliaCaching { - /// Caching is disabled and the peers closest to records being looked up - /// that do not return a record are not tracked, i.e. - /// [`GetRecordOk::cache_candidates`] is always empty. - Disabled, - /// Up to `max_peers` peers not returning a record that are closest to the key - /// being looked up are tracked and returned in [`GetRecordOk::cache_candidates`]. - /// Furthermore, if [`Kademlia::get_record`] is used with a quorum of 1, the - /// found record is automatically sent to (i.e. cached at) these peers. For lookups with a - /// quorum > 1, the write-back operation must be performed explicitly, if - /// desired and after choosing a record from the results, via [`Kademlia::put_record_to`]. - Enabled { max_peers: u16 }, -} - impl Default for KademliaConfig { fn default() -> Self { KademliaConfig { @@ -215,6 +198,21 @@ impl Default for KademliaConfig { } } +/// The configuration for Kademlia "write-back" caching after successful +/// lookups via [`Kademlia::get_record`]. +#[derive(Debug, Clone)] +pub enum KademliaCaching { + /// Caching is disabled and the peers closest to records being looked up + /// that do not return a record are not tracked, i.e. + /// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty. + Disabled, + /// Up to `max_peers` peers not returning a record that are closest to the key + /// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`]. + /// The write-back operation must be performed explicitly, if + /// desired and after choosing a record from the results, via [`Kademlia::put_record_to`]. + Enabled { max_peers: u16 }, +} + impl KademliaConfig { /// Sets custom protocol names. /// @@ -435,6 +433,7 @@ where Kademlia { store, + caching: config.caching, kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout), kbucket_inserts: config.kbucket_inserts, protocol_config: config.protocol_config, @@ -448,7 +447,6 @@ where provider_record_ttl: config.provider_record_ttl, connection_idle_timeout: config.connection_idle_timeout, local_addrs: HashSet::new(), - caching: config.caching, } } @@ -661,13 +659,15 @@ where where K: Into> + Into> + Clone, { + let target: kbucket::Key = key.clone().into(); + let key: Vec = key.into(); let info = QueryInfo::GetClosestPeers { - key: key.clone().into(), + key, + step: ProgressStep::first(), }; - let target: kbucket::Key = key.into(); - let peers = self.kbuckets.closest_keys(&target); + let peer_keys: Vec> = self.kbuckets.closest_keys(&target).collect(); let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner) + self.queries.add_iter_closest(target, peer_keys, inner) } /// Returns closest peers to the given key; takes peers from local routing table only. @@ -682,36 +682,56 @@ where /// /// The result of this operation is delivered in a /// [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetRecord}`]. - pub fn get_record(&mut self, key: record::Key, quorum: Quorum) -> QueryId { - let quorum = quorum.eval(self.queries.config().replication_factor); - let mut records = Vec::with_capacity(quorum.get()); - - if let Some(record) = self.store.get(&key) { + pub fn get_record(&mut self, key: record::Key) -> QueryId { + let record = if let Some(record) = self.store.get(&key) { if record.is_expired(Instant::now()) { - self.store.remove(&key) + self.store.remove(&key); + None } else { - records.push(PeerRecord { + Some(PeerRecord { peer: None, record: record.into_owned(), - }); + }) } - } + } else { + None + }; + + let step = ProgressStep::first(); - let done = records.len() >= quorum.get(); let target = kbucket::Key::new(key.clone()); - let info = QueryInfo::GetRecord { - key, - records, - quorum, - cache_candidates: BTreeMap::new(), + let info = if record.is_some() { + QueryInfo::GetRecord { + key, + step: step.next(), + found_a_record: true, + cache_candidates: BTreeMap::new(), + } + } else { + QueryInfo::GetRecord { + key, + step: step.clone(), + found_a_record: false, + cache_candidates: BTreeMap::new(), + } }; let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); - let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*) + let id = self.queries.add_iter_closest(target.clone(), peers, inner); + + // No queries were actually done for the results yet. + let stats = QueryStats::empty(); - // Instantly finish the query if we already have enough records. - if done { - self.queries.get_mut(&id).expect("by (*)").finish(); + if let Some(record) = record { + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))), + step, + stats, + }, + )); } id @@ -766,19 +786,16 @@ where /// /// If the record's expiration is `None`, the configured record TTL is used. /// - /// > **Note**: This is not a regular Kademlia DHT operation. It may be + /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be /// > used to selectively update or store a record to specific peers /// > for the purpose of e.g. making sure these peers have the latest /// > "version" of a record or to "cache" a record at further peers /// > to increase the lookup success rate on the DHT for other peers. /// > - /// > In particular, if lookups are performed with a quorum > 1 multiple - /// > possibly different records may be returned and the standard Kademlia + /// > In particular, there is no automatic storing of records performed, and this + /// > method must be used to ensure the standard Kademlia /// > procedure of "caching" (i.e. storing) a found record at the closest - /// > node to the key that _did not_ return it cannot be employed - /// > transparently. In that case, client code can explicitly choose - /// > which record to store at which peers for analogous write-back - /// > caching or for other reasons. + /// > node to the key that _did not_ return it. pub fn put_record_to(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId where I: ExactSizeIterator, @@ -854,6 +871,7 @@ where let info = QueryInfo::Bootstrap { peer: *local_key.preimage(), remaining: None, + step: ProgressStep::first(), }; let peers = self.kbuckets.closest_keys(&local_key).collect::>(); if peers.is_empty() { @@ -924,21 +942,49 @@ where /// The result of this operation is delivered in a /// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetProviders}`]. pub fn get_providers(&mut self, key: record::Key) -> QueryId { - let providers = self + let providers: HashSet<_> = self .store .providers(&key) .into_iter() .filter(|p| !p.is_expired(Instant::now())) .map(|p| p.provider) .collect(); + + let step = ProgressStep::first(); + let info = QueryInfo::GetProviders { key: key.clone(), - providers, + providers_found: providers.len(), + step: if providers.is_empty() { + step.clone() + } else { + step.next() + }, }; - let target = kbucket::Key::new(key); + + let target = kbucket::Key::new(key.clone()); let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); - self.queries.add_iter_closest(target.clone(), peers, inner) + let id = self.queries.add_iter_closest(target.clone(), peers, inner); + + // No queries were actually done for the results yet. + let stats = QueryStats::empty(); + + if !providers.is_empty() { + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { + key, + providers, + })), + step, + stats, + }, + )); + } + id } /// Processes discovered peers from a successful request in an iterative `Query`. @@ -1189,7 +1235,11 @@ where log::trace!("Query {:?} finished.", query_id); let result = q.into_result(); match result.inner.info { - QueryInfo::Bootstrap { peer, remaining } => { + QueryInfo::Bootstrap { + peer, + remaining, + mut step, + } => { let local_key = self.kbuckets.local_key().clone(); let mut remaining = remaining.unwrap_or_else(|| { debug_assert_eq!(&peer, local_key.preimage()); @@ -1235,41 +1285,53 @@ where let info = QueryInfo::Bootstrap { peer: *target.preimage(), remaining: Some(remaining), + step: step.next(), }; let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); self.queries .continue_iter_closest(query_id, target.clone(), peers, inner); - } + } else { + step.last = true; + }; - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining, })), + step, }) } - QueryInfo::GetClosestPeers { key, .. } => Some(KademliaEvent::OutboundQueryCompleted { - id: query_id, - stats: result.stats, - result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { - key, - peers: result.peers.collect(), - })), - }), + QueryInfo::GetClosestPeers { key, mut step } => { + step.last = true; - QueryInfo::GetProviders { key, providers } => { - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, - result: QueryResult::GetProviders(Ok(GetProvidersOk { + result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { key, - providers, - closest_peers: result.peers.collect(), + peers: result.peers.collect(), })), + step, + }) + } + + QueryInfo::GetProviders { mut step, .. } => { + step.last = true; + + Some(KademliaEvent::OutboundQueryProgressed { + id: query_id, + stats: result.stats, + result: QueryResult::GetProviders(Ok( + GetProvidersOk::FinishedWithNoAdditionalRecord { + closest_peers: result.peers.collect(), + }, + )), + step, }) } @@ -1302,65 +1364,41 @@ where .. }, } => match context { - AddProviderContext::Publish => Some(KademliaEvent::OutboundQueryCompleted { + AddProviderContext::Publish => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::StartProviding(Ok(AddProviderOk { key })), + step: ProgressStep::first_and_last(), }), - AddProviderContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { + AddProviderContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })), + step: ProgressStep::first_and_last(), }), }, QueryInfo::GetRecord { key, - records, - quorum, + mut step, + found_a_record, cache_candidates, } => { - let results = if records.len() >= quorum.get() { - // [not empty] - if quorum.get() == 1 && !cache_candidates.is_empty() { - // Cache the record at the closest node(s) to the key that - // did not return the record. - let record = records.first().expect("[not empty]").record.clone(); - let quorum = NonZeroUsize::new(1).expect("1 > 0"); - let context = PutRecordContext::Cache; - let info = QueryInfo::PutRecord { - context, - record, - quorum, - phase: PutRecordPhase::PutRecord { - success: vec![], - get_closest_peers_stats: QueryStats::empty(), - }, - }; - let inner = QueryInner::new(info); - self.queries - .add_fixed(cache_candidates.values().copied(), inner); - } - Ok(GetRecordOk { - records, - cache_candidates, - }) - } else if records.is_empty() { + step.last = true; + + let results = if found_a_record { + Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates }) + } else { Err(GetRecordError::NotFound { key, closest_peers: result.peers.collect(), }) - } else { - Err(GetRecordError::QuorumFailed { - key, - records, - quorum, - }) }; - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetRecord(results), + step, }) } @@ -1407,25 +1445,23 @@ where }; match context { PutRecordContext::Publish | PutRecordContext::Custom => { - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::PutRecord(mk_result(record.key)), + step: ProgressStep::first_and_last(), }) } - PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { + PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: get_closest_peers_stats.merge(result.stats), result: QueryResult::RepublishRecord(mk_result(record.key)), + step: ProgressStep::first_and_last(), }), PutRecordContext::Replicate => { debug!("Record replicated: {:?}", record.key); None } - PutRecordContext::Cache => { - debug!("Record cached: {:?}", record.key); - None - } } } } @@ -1440,54 +1476,66 @@ where QueryInfo::Bootstrap { peer, mut remaining, + mut step, } => { let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32); - if let Some(mut remaining) = remaining.take() { - // Continue with the next bootstrap query if `remaining` is not empty. - if let Some(target) = remaining.next() { - let info = QueryInfo::Bootstrap { - peer: target.clone().into_preimage(), - remaining: Some(remaining), - }; - let peers = self.kbuckets.closest_keys(&target); - let inner = QueryInner::new(info); - self.queries - .continue_iter_closest(query_id, target.clone(), peers, inner); - } + // Continue with the next bootstrap query if `remaining` is not empty. + if let Some((target, remaining)) = + remaining.take().and_then(|mut r| Some((r.next()?, r))) + { + let info = QueryInfo::Bootstrap { + peer: target.clone().into_preimage(), + remaining: Some(remaining), + step: step.next(), + }; + let peers = self.kbuckets.closest_keys(&target); + let inner = QueryInner::new(info); + self.queries + .continue_iter_closest(query_id, target.clone(), peers, inner); + } else { + step.last = true; } - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::Bootstrap(Err(BootstrapError::Timeout { peer, num_remaining, })), + step, }) } QueryInfo::AddProvider { context, key, .. } => Some(match context { - AddProviderContext::Publish => KademliaEvent::OutboundQueryCompleted { + AddProviderContext::Publish => KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })), + step: ProgressStep::first_and_last(), }, - AddProviderContext::Republish => KademliaEvent::OutboundQueryCompleted { + AddProviderContext::Republish => KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })), + step: ProgressStep::first_and_last(), }, }), - QueryInfo::GetClosestPeers { key } => Some(KademliaEvent::OutboundQueryCompleted { - id: query_id, - stats: result.stats, - result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout { - key, - peers: result.peers.collect(), - })), - }), + QueryInfo::GetClosestPeers { key, mut step } => { + step.last = true; + + Some(KademliaEvent::OutboundQueryProgressed { + id: query_id, + stats: result.stats, + result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout { + key, + peers: result.peers.collect(), + })), + step, + }) + } QueryInfo::PutRecord { record, @@ -1505,16 +1553,18 @@ where }); match context { PutRecordContext::Publish | PutRecordContext::Custom => { - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::PutRecord(err), + step: ProgressStep::first_and_last(), }) } - PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { + PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::RepublishRecord(err), + step: ProgressStep::first_and_last(), }), PutRecordContext::Replicate => match phase { PutRecordPhase::GetClosestPeers => { @@ -1526,45 +1576,31 @@ where None } }, - PutRecordContext::Cache => match phase { - PutRecordPhase::GetClosestPeers => { - // Caching a record at the closest peer to a key that did not return - // a record is never preceded by a lookup for the closest peers, i.e. - // it is a direct query to a single peer. - unreachable!() - } - PutRecordPhase::PutRecord { .. } => { - debug!("Caching record failed: {:?}", err); - None - } - }, } } - QueryInfo::GetRecord { - key, - records, - quorum, - .. - } => Some(KademliaEvent::OutboundQueryCompleted { - id: query_id, - stats: result.stats, - result: QueryResult::GetRecord(Err(GetRecordError::Timeout { - key, - records, - quorum, - })), - }), + QueryInfo::GetRecord { key, mut step, .. } => { + step.last = true; - QueryInfo::GetProviders { key, providers } => { - Some(KademliaEvent::OutboundQueryCompleted { + Some(KademliaEvent::OutboundQueryProgressed { + id: query_id, + stats: result.stats, + result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })), + step, + }) + } + + QueryInfo::GetProviders { key, mut step, .. } => { + step.last = true; + + Some(KademliaEvent::OutboundQueryProgressed { id: query_id, stats: result.stats, result: QueryResult::GetProviders(Err(GetProvidersError::Timeout { key, - providers, closest_peers: result.peers.collect(), })), + step, }) } } @@ -2063,10 +2099,32 @@ where let peers = closer_peers.iter().chain(provider_peers.iter()); self.discovered(&user_data, &source, peers); if let Some(query) = self.queries.get_mut(&user_data) { - if let QueryInfo::GetProviders { providers, .. } = &mut query.inner.info { - for peer in provider_peers { - providers.insert(peer.node_id); - } + let stats = query.stats().clone(); + if let QueryInfo::GetProviders { + ref key, + ref mut providers_found, + ref mut step, + .. + } = query.inner.info + { + *providers_found += provider_peers.len(); + let providers = provider_peers.iter().map(|p| p.node_id).collect(); + + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id: user_data, + result: QueryResult::GetProviders(Ok( + GetProvidersOk::FoundProviders { + key: key.clone(), + providers, + }, + )), + step: step.clone(), + stats, + }, + )); + *step = step.next(); } } } @@ -2138,40 +2196,34 @@ where user_data, } => { if let Some(query) = self.queries.get_mut(&user_data) { + let stats = query.stats().clone(); if let QueryInfo::GetRecord { key, - records, - quorum, + ref mut step, + ref mut found_a_record, cache_candidates, } = &mut query.inner.info { if let Some(record) = record { - records.push(PeerRecord { + *found_a_record = true; + let record = PeerRecord { peer: Some(source), record, - }); + }; - let quorum = quorum.get(); - if records.len() >= quorum { - // Desired quorum reached. The query may finish. See - // [`Query::try_finish`] for details. - let peers = records - .iter() - .filter_map(|PeerRecord { peer, .. }| peer.as_ref()) - .cloned() - .collect::>(); - let finished = query.try_finish(peers.iter()); - if !finished { - debug!( - "GetRecord query ({:?}) reached quorum ({}/{}) with \ - response from peer {} but could not yet finish.", - user_data, - peers.len(), - quorum, - source, - ); - } - } + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id: user_data, + result: QueryResult::GetRecord(Ok( + GetRecordOk::FoundRecord(record), + )), + step: step.clone(), + stats, + }, + )); + + *step = step.next(); } else { log::trace!("Record with key {:?} not found at {}", key, source); if let KademliaCaching::Enabled { max_peers } = self.caching { @@ -2323,6 +2375,7 @@ where { query.on_success(&peer_id, vec![]) } + if self.connected_peers.contains(&peer_id) { self.queued_events .push_back(NetworkBehaviourAction::NotifyHandler { @@ -2431,14 +2484,16 @@ pub enum KademliaEvent { // is made of multiple requests across multiple remote peers. InboundRequest { request: InboundRequest }, - /// An outbound query has produced a result. - OutboundQueryCompleted { + /// An outbound query has made progress. + OutboundQueryProgressed { /// The ID of the query that finished. id: QueryId, - /// The result of the query. + /// The intermediate result of the query. result: QueryResult, /// Execution statistics from the query. stats: QueryStats, + /// Indicates which event this is, if therer are multiple responses for a single query. + step: ProgressStep, }, /// The routing table has been updated with a new peer and / or @@ -2492,6 +2547,37 @@ pub enum KademliaEvent { PendingRoutablePeer { peer: PeerId, address: Multiaddr }, } +/// Information about progress events. +#[derive(Debug, Clone)] +pub struct ProgressStep { + /// The index into the event + pub count: NonZeroUsize, + /// Is this the final event? + pub last: bool, +} + +impl ProgressStep { + fn first() -> Self { + Self { + count: NonZeroUsize::new(1).expect("1 to be greater than 0."), + last: false, + } + } + + fn first_and_last() -> Self { + let mut first = ProgressStep::first(); + first.last = true; + first + } + + fn next(&self) -> Self { + assert!(!self.last); + let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0."); + + Self { count, last: false } + } +} + /// Information about a received and handled inbound request. #[derive(Debug, Clone)] pub enum InboundRequest { @@ -2558,19 +2644,20 @@ pub type GetRecordResult = Result; /// The successful result of [`Kademlia::get_record`]. #[derive(Debug, Clone)] -pub struct GetRecordOk { - /// The records found, including the peer that returned them. - pub records: Vec, - /// If caching is enabled, these are the peers closest - /// _to the record key_ (not the local node) that were queried but - /// did not return the record, sorted by distance to the record key - /// from closest to farthest. How many of these are tracked is configured - /// by [`KademliaConfig::set_caching`]. If the lookup used a quorum of - /// 1, these peers will be sent the record as a means of caching. - /// If the lookup used a quorum > 1, you may wish to use these - /// candidates with [`Kademlia::put_record_to`] after selecting - /// one of the returned records. - pub cache_candidates: BTreeMap, +pub enum GetRecordOk { + FoundRecord(PeerRecord), + FinishedWithNoAdditionalRecord { + /// If caching is enabled, these are the peers closest + /// _to the record key_ (not the local node) that were queried but + /// did not return the record, sorted by distance to the record key + /// from closest to farthest. How many of these are tracked is configured + /// by [`KademliaConfig::set_caching`]. If the lookup used a quorum of + /// 1, these peers will be sent the record as a means of caching. + /// If the lookup used a quorum > 1, you may wish to use these + /// candidates with [`Kademlia::put_record_to`] after selecting + /// one of the returned records. + cache_candidates: BTreeMap, + }, } /// The error result of [`Kademlia::get_record`]. @@ -2588,11 +2675,7 @@ pub enum GetRecordError { quorum: NonZeroUsize, }, #[error("the request timed out")] - Timeout { - key: record::Key, - records: Vec, - quorum: NonZeroUsize, - }, + Timeout { key: record::Key }, } impl GetRecordError { @@ -2722,10 +2805,15 @@ pub type GetProvidersResult = Result; /// The successful result of [`Kademlia::get_providers`]. #[derive(Debug, Clone)] -pub struct GetProvidersOk { - pub key: record::Key, - pub providers: HashSet, - pub closest_peers: Vec, +pub enum GetProvidersOk { + FoundProviders { + key: record::Key, + /// The new set of providers discovered. + providers: HashSet, + }, + FinishedWithNoAdditionalRecord { + closest_peers: Vec, + }, } /// The error result of [`Kademlia::get_providers`]. @@ -2734,7 +2822,6 @@ pub enum GetProvidersError { #[error("the request timed out")] Timeout { key: record::Key, - providers: HashSet, closest_peers: Vec, }, } @@ -2847,11 +2934,6 @@ pub enum PutRecordContext { /// The context is periodic replication (i.e. without extending /// the record TTL) of stored records received earlier from another peer. Replicate, - /// The context is an automatic write-back caching operation of a - /// record found via [`Kademlia::get_record`] at the closest node - /// to the key queried that did not return a record. This only - /// occurs after a lookup quorum of 1 as per standard Kademlia. - Cache, /// The context is a custom store operation targeting specific /// peers initiated by [`Kademlia::put_record_to`]. Custom, @@ -2871,17 +2953,25 @@ pub enum QueryInfo { /// yet completed and `Some` with an exhausted iterator /// if bootstrapping is complete. remaining: Option>>, + step: ProgressStep, }, - /// A query initiated by [`Kademlia::get_closest_peers`]. - GetClosestPeers { key: Vec }, + /// A (repeated) query initiated by [`Kademlia::get_closest_peers`]. + GetClosestPeers { + /// The key being queried (the preimage). + key: Vec, + /// Current index of events. + step: ProgressStep, + }, - /// A query initiated by [`Kademlia::get_providers`]. + /// A (repeated) query initiated by [`Kademlia::get_providers`]. GetProviders { /// The key for which to search for providers. key: record::Key, - /// The found providers. - providers: HashSet, + /// The number of providers found so far. + providers_found: usize, + /// Current index of events. + step: ProgressStep, }, /// A (repeated) query initiated by [`Kademlia::start_providing`]. @@ -2905,15 +2995,14 @@ pub enum QueryInfo { context: PutRecordContext, }, - /// A query initiated by [`Kademlia::get_record`]. + /// A (repeated) query initiated by [`Kademlia::get_record`]. GetRecord { /// The key to look for. key: record::Key, - /// The records with the id of the peer that returned them. `None` when - /// the record was found in the local store. - records: Vec, - /// The number of records to look for. - quorum: NonZeroUsize, + /// Current index of events. + step: ProgressStep, + /// Did we find at least one record? + found_a_record: bool, /// The peers closest to the `key` that were queried but did not return a record, /// i.e. the peers that are candidates for caching the record. cache_candidates: BTreeMap, diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 5af60dc0c29..00f8816e52a 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -188,7 +188,7 @@ fn bootstrap() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::Bootstrap(Ok(ok)), .. @@ -257,8 +257,9 @@ fn query_iter() { match swarms[0].behaviour_mut().query(&qid) { Some(q) => match q.info() { - QueryInfo::GetClosestPeers { key } => { - assert_eq!(&key[..], search_target.to_bytes().as_slice()) + QueryInfo::GetClosestPeers { key, step } => { + assert_eq!(&key[..], search_target.to_bytes().as_slice()); + assert_eq!(usize::from(step.count), 1); } i => panic!("Unexpected query info: {:?}", i), }, @@ -277,7 +278,7 @@ fn query_iter() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::GetClosestPeers(Ok(ok)), .. @@ -336,7 +337,7 @@ fn unresponsive_not_returned_direct() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(Ok(ok)), .. }, @@ -396,7 +397,7 @@ fn unresponsive_not_returned_indirect() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(Ok(ok)), .. }, @@ -444,16 +445,14 @@ fn get_record_not_found() { .collect::>(); let target_key = record::Key::from(random_multihash()); - let qid = swarms[0] - .behaviour_mut() - .get_record(target_key.clone(), Quorum::One); + let qid = swarms[0].behaviour_mut().get_record(target_key.clone()); block_on(poll_fn(move |ctx| { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::GetRecord(Err(e)), .. @@ -573,17 +572,19 @@ fn put_record() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::PutRecord(res), stats, + step: index, }, ))) | Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::RepublishRecord(res), stats, + step: index, }, ))) => { assert!(qids.is_empty() || qids.remove(&id)); @@ -591,6 +592,8 @@ fn put_record() { assert!(stats.num_successes() >= replication_factor.get() as u32); assert!(stats.num_requests() >= stats.num_successes()); assert_eq!(stats.num_failures(), 0); + assert_eq!(usize::from(index.count), 1); + assert!(index.last); match res { Err(e) => panic!("{:?}", e), Ok(ok) => { @@ -755,36 +758,35 @@ fn get_record() { let record = Record::new(random_multihash(), vec![4, 5, 6]); - let expected_cache_candidate = *Swarm::local_peer_id(&swarms[1]); - swarms[2].behaviour_mut().store.put(record.clone()).unwrap(); - let qid = swarms[0] - .behaviour_mut() - .get_record(record.key.clone(), Quorum::One); + let qid = swarms[0].behaviour_mut().get_record(record.key.clone()); block_on(poll_fn(move |ctx| { for swarm in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, - result: - QueryResult::GetRecord(Ok(GetRecordOk { - records, - cache_candidates, - })), + result: QueryResult::GetRecord(Ok(r)), + step: ProgressStep { count, last }, .. }, ))) => { assert_eq!(id, qid); - assert_eq!(records.len(), 1); - assert_eq!(records.first().unwrap().record, record); - assert_eq!(cache_candidates.len(), 1); - assert_eq!( - cache_candidates.values().next(), - Some(&expected_cache_candidate) - ); + if usize::from(count) == 1 { + assert!(!last); + assert!(matches!(r, GetRecordOk::FoundRecord(_))); + if let GetRecordOk::FoundRecord(r) = r { + assert_eq!(r.record, record); + } + } else if last { + assert_eq!(usize::from(count), 2); + assert!(matches!( + r, + GetRecordOk::FinishedWithNoAdditionalRecord { .. } + )); + } return Poll::Ready(()); } // Ignore any other event. @@ -816,25 +818,34 @@ fn get_record_many() { } let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap()); - let qid = swarms[0] - .behaviour_mut() - .get_record(record.key.clone(), quorum); + let qid = swarms[0].behaviour_mut().get_record(record.key.clone()); block_on(poll_fn(move |ctx| { - for swarm in &mut swarms { + for (i, swarm) in swarms.iter_mut().enumerate() { + let mut records = Vec::new(); + let quorum = quorum.eval(swarm.behaviour().queries.config().replication_factor); loop { + if i == 0 && records.len() >= quorum.get() { + swarm.behaviour_mut().query_mut(&qid).unwrap().finish(); + } match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, - result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })), + result: QueryResult::GetRecord(Ok(r)), + step: ProgressStep { count: _, last }, .. }, ))) => { assert_eq!(id, qid); - assert!(records.len() >= num_results); - assert!(records.into_iter().all(|r| r.record == record)); - return Poll::Ready(()); + if let GetRecordOk::FoundRecord(r) = r { + assert_eq!(r.record, record); + records.push(r); + } + + if last { + return Poll::Ready(()); + } } // Ignore any other event. Poll::Ready(Some(_)) => (), @@ -913,14 +924,14 @@ fn add_provider() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::StartProviding(res), .. }, ))) | Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { id, result: QueryResult::RepublishProvider(res), .. @@ -1051,7 +1062,7 @@ fn exceed_jobs_max_queries() { loop { if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) { match e { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { + SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(Ok(r)), .. }) => break assert!(r.peers.is_empty()), @@ -1097,12 +1108,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { // Make `bob` and `trudy` aware of their version of the record searched by // `alice`. bob.1.behaviour_mut().store.put(record_bob.clone()).unwrap(); - trudy - .1 - .behaviour_mut() - .store - .put(record_trudy.clone()) - .unwrap(); + trudy.1.behaviour_mut().store.put(record_trudy).unwrap(); // Make `trudy` and `bob` known to `alice`. alice @@ -1118,7 +1124,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { let (mut alice, mut bob, mut trudy) = (alice.1, bob.1, trudy.1); // Have `alice` query the Dht for `key` with a quorum of 1. - alice.behaviour_mut().get_record(key, Quorum::One); + alice.behaviour_mut().get_record(key); // The default peer timeout is 10 seconds. Choosing 1 seconds here should // give enough head room to prevent connections to `bob` to time out. @@ -1126,25 +1132,32 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { // Poll only `alice` and `trudy` expecting `alice` not yet to return a query // result as it is not able to connect to `bob` just yet. + let addr_trudy = *Swarm::local_peer_id(&trudy); block_on(poll_fn(|ctx| { for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetRecord(result), + step, .. }, ))) => { if i != 0 { panic!("Expected `QueryResult` from Alice.") } - - match result { - Ok(_) => panic!( + if step.last { + panic!( "Expected query not to finish until all \ - disjoint paths have been explored.", - ), + disjoint paths have been explored.", + ); + } + match result { + Ok(GetRecordOk::FoundRecord(r)) => { + assert_eq!(r.peer, Some(addr_trudy)); + } + Ok(_) => {} Err(e) => panic!("{:?}", e), } } @@ -1162,19 +1175,14 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { // Make sure `alice` has exactly one query with `trudy`'s record only. assert_eq!(1, alice.behaviour().queries.iter().count()); + alice .behaviour() .queries .iter() .for_each(|q| match &q.inner.info { - QueryInfo::GetRecord { records, .. } => { - assert_eq!( - *records, - vec![PeerRecord { - peer: Some(*Swarm::local_peer_id(&trudy)), - record: record_trudy.clone(), - }], - ); + QueryInfo::GetRecord { step, .. } => { + assert_eq!(usize::from(step.count), 2); } i => panic!("Unexpected query info: {:?}", i), }); @@ -1182,21 +1190,32 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { // Poll `alice` and `bob` expecting `alice` to return a successful query // result as it is now able to explore the second disjoint path. let records = block_on(poll_fn(|ctx| { + let mut records = Vec::new(); for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(SwarmEvent::Behaviour( - KademliaEvent::OutboundQueryCompleted { + KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetRecord(result), + step, .. }, ))) => { if i != 0 { panic!("Expected `QueryResult` from Alice.") } - match result { - Ok(ok) => return Poll::Ready(ok.records), + Ok(ok) => { + if let GetRecordOk::FoundRecord(record) = ok { + records.push(record); + } + if records.len() == 1 { + return Poll::Ready(records); + } + if step.last { + break; + } + } Err(e) => unreachable!("{:?}", e), } } @@ -1211,15 +1230,11 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { Poll::Pending })); - assert_eq!(2, records.len()); + assert_eq!(1, records.len()); assert!(records.contains(&PeerRecord { peer: Some(*Swarm::local_peer_id(&bob)), record: record_bob, })); - assert!(records.contains(&PeerRecord { - peer: Some(*Swarm::local_peer_id(&trudy)), - record: record_trudy, - })); } /// Tests that peers are not automatically inserted into @@ -1339,7 +1354,7 @@ fn network_behaviour_on_address_change() { } #[test] -fn get_providers() { +fn get_providers_single() { fn prop(key: record::Key) { let (_, mut single_swarm) = build_node(); single_swarm @@ -1349,7 +1364,7 @@ fn get_providers() { block_on(async { match single_swarm.next().await.unwrap() { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { + SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { result: QueryResult::StartProviding(Ok(_)), .. }) => {} @@ -1358,30 +1373,137 @@ fn get_providers() { } }); - let query_id = single_swarm.behaviour_mut().get_providers(key.clone()); + let query_id = single_swarm.behaviour_mut().get_providers(key); block_on(async { - match single_swarm.next().await.unwrap() { - SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { - id, - result: - QueryResult::GetProviders(Ok(GetProvidersOk { - key: found_key, - providers, - .. - })), - .. - }) if id == query_id => { - assert_eq!(key, found_key); - assert_eq!( - single_swarm.local_peer_id(), - providers.iter().next().unwrap() - ); + loop { + match single_swarm.next().await.unwrap() { + SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetProviders(Ok(ok)), + step: index, + .. + }) if id == query_id => { + if index.last { + assert!(matches!( + ok, + GetProvidersOk::FinishedWithNoAdditionalRecord { .. } + )); + break; + } else { + assert!(matches!(ok, GetProvidersOk::FoundProviders { .. })); + if let GetProvidersOk::FoundProviders { providers, .. } = ok { + assert_eq!(providers.len(), 1); + assert!(providers.contains(single_swarm.local_peer_id())); + } + } + } + SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e), + _ => {} } - SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e), - _ => {} } }); } QuickCheck::new().tests(10).quickcheck(prop as fn(_)) } + +fn get_providers_limit() { + fn prop(key: record::Key) { + let mut swarms = build_nodes(3); + + // Let first peer know of second peer and second peer know of third peer. + for i in 0..2 { + let (peer_id, address) = ( + *Swarm::local_peer_id(&swarms[i + 1].1), + swarms[i + 1].0.clone(), + ); + swarms[i].1.behaviour_mut().add_address(&peer_id, address); + } + + // Drop the swarm addresses. + let mut swarms = swarms + .into_iter() + .map(|(_addr, swarm)| swarm) + .collect::>(); + + // Provide the content on peer 2 and 3. + for swarm in swarms.iter_mut().take(3).skip(1) { + swarm + .behaviour_mut() + .start_providing(key.clone()) + .expect("could not provide"); + } + + // Query with expecting a single provider. + let query_id = swarms[0].behaviour_mut().get_providers(key.clone()); + + let mut all_providers: Vec = vec![]; + + block_on(poll_fn(move |ctx| { + for (i, swarm) in swarms.iter_mut().enumerate() { + loop { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(SwarmEvent::Behaviour( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetProviders(Ok(ok)), + step: index, + .. + }, + ))) if i == 0 && id == query_id => { + if index.last { + assert!(matches!( + ok, + GetProvidersOk::FinishedWithNoAdditionalRecord { .. } + )); + assert_eq!(all_providers.len(), N); + return Poll::Ready(()); + } else { + assert!(matches!(ok, GetProvidersOk::FoundProviders { .. })); + if let GetProvidersOk::FoundProviders { + key: found_key, + providers, + } = ok + { + // There are a total of 2 providers. + assert_eq!(key, found_key); + for provider in &providers { + // Providers should be either 2 or 3 + assert_ne!(swarm.local_peer_id(), provider); + } + all_providers.extend(providers); + + // If we have all providers, finish. + if all_providers.len() == N { + swarm.behaviour_mut().query_mut(&id).unwrap().finish(); + } + } + return Poll::Ready(()); + } + } + Poll::Ready(..) => {} + Poll::Pending => break, + } + } + } + Poll::Pending + })); + } + + QuickCheck::new().tests(10).quickcheck(prop:: as fn(_)) +} + +#[test] +fn get_providers_limit_n_1() { + get_providers_limit::<1>(); +} + +#[test] +fn get_providers_limit_n_2() { + get_providers_limit::<2>(); +} + +#[test] +fn get_providers_limit_n_5() { + get_providers_limit::<5>(); +} diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 76861665b3f..e712da0c7d3 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -64,7 +64,7 @@ pub use behaviour::{ }; pub use behaviour::{ Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent, - KademliaStoreInserts, Quorum, + KademliaStoreInserts, ProgressStep, Quorum, }; pub use protocol::KadConnectionType; pub use query::QueryId;