diff --git a/client/authority-discovery/src/error.rs b/client/authority-discovery/src/error.rs index b1358485c37ee..48bcdf33114b1 100644 --- a/client/authority-discovery/src/error.rs +++ b/client/authority-discovery/src/error.rs @@ -34,10 +34,8 @@ pub enum Error { HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError), /// Failed calling into the Substrate runtime. CallingRuntime(sp_blockchain::Error), - /// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it - /// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This - /// error is the result of the above failing. - MatchingHashedAuthorityIdWithAuthorityId, + /// Received a dht record with a key that does not match any in-flight awaited keys. + ReceivingUnexpectedRecord, /// Failed to set the authority discovery peerset priority group in the peerset module. SettingPeersetPriorityGroup(String), /// Failed to encode a protobuf payload. diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index 629ea4fb2f423..ff4d12dadd988 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -35,6 +35,7 @@ use libp2p::{core::multiaddr, multihash::Multihash}; use log::{debug, error, log_enabled}; use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register}; use prost::Message; +use rand::{seq::SliceRandom, thread_rng}; use sc_client_api::blockchain::HeaderBackend; use sc_network::{ config::MultiaddrWithPeerId, @@ -70,6 +71,9 @@ const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities"; /// Maximum number of addresses cached per authority. Additional addresses are discarded. const MAX_ADDRESSES_PER_AUTHORITY: usize = 10; +/// Maximum number of in-flight DHT lookups at any given point in time. +const MAX_IN_FLIGHT_LOOKUPS: usize = 8; + /// Role an authority discovery module can run as. pub enum Role { /// Actual authority as well as a reference to its key store. @@ -137,12 +141,17 @@ where /// Interval to be proactive, publishing own addresses. publish_interval: Interval, - /// Interval on which to query for addresses of other authorities. + /// Interval at which to request addresses of authorities, refilling the pending lookups queue. query_interval: Interval, /// Interval on which to set the peerset priority group to a new random /// set of addresses. priority_group_set_interval: Interval, + /// Queue of throttled lookups pending to be passed to the network. + pending_lookups: Vec, + /// Set of in-flight lookups. + in_flight_lookups: HashMap, + addr_cache: addr_cache::AddrCache, metrics: Option, @@ -183,8 +192,8 @@ where Duration::from_secs(12 * 60 * 60), ); - // External addresses of other authorities can change at any given point in time. The - // interval on which to query for external addresses of other authorities is a trade off + // External addresses of remote authorities can change at any given point in time. The + // interval on which to trigger new queries for the current authorities is a trade off // between efficiency and performance. let query_interval_start = Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME; let query_interval_duration = Duration::from_secs(10 * 60); @@ -193,9 +202,9 @@ where // Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when // comparing `authority_discovery_authority_addresses_requested_total` and // `authority_discovery_dht_event_received`. With that in mind set the peerset priority - // group on the same interval as the [`query_interval`] above, just delayed by 2 minutes. + // group on the same interval as the [`query_interval`] above, just delayed by 5 minutes. let priority_group_set_interval = interval_at( - query_interval_start + Duration::from_secs(2 * 60), + query_interval_start + Duration::from_secs(5 * 60), query_interval_duration, ); @@ -229,6 +238,8 @@ where publish_interval, query_interval, priority_group_set_interval, + pending_lookups: Vec::new(), + in_flight_lookups: HashMap::new(), addr_cache, role, metrics, @@ -270,7 +281,9 @@ where if let Some(metrics) = &self.metrics { metrics.publish.inc(); - metrics.amount_last_published.set(addresses.len() as u64); + metrics.amount_addresses_last_published.set( + addresses.len().try_into().unwrap_or(std::u64::MAX), + ); } let mut serialized_addresses = vec![]; @@ -314,15 +327,9 @@ where Ok(()) } - fn request_addresses_of_others(&mut self) -> Result<()> { + fn refill_pending_lookups_queue(&mut self) -> Result<()> { let id = BlockId::hash(self.client.info().best_hash); - let authorities = self - .client - .runtime_api() - .authorities(&id) - .map_err(Error::CallingRuntime)?; - let local_keys = match &self.role { Role::Authority(key_store) => { key_store.read() @@ -333,21 +340,52 @@ where Role::Sentry => HashSet::new(), }; - for authority_id in authorities.iter() { - // Make sure we don't look up our own keys. - if !local_keys.contains(authority_id.as_ref()) { - if let Some(metrics) = &self.metrics { - metrics.request.inc(); - } + let mut authorities = self + .client + .runtime_api() + .authorities(&id) + .map_err(Error::CallingRuntime)? + .into_iter() + .filter(|id| !local_keys.contains(id.as_ref())) + .collect(); - self.network - .get_value(&hash_authority_id(authority_id.as_ref())); - } + self.addr_cache.retain_ids(&authorities); + + authorities.shuffle(&mut thread_rng()); + self.pending_lookups = authorities; + // Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as + // query interval ticks are far enough apart for all lookups to succeed. + self.in_flight_lookups.clear(); + + if let Some(metrics) = &self.metrics { + metrics.requests_pending.set( + self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX), + ); } Ok(()) } + fn start_new_lookups(&mut self) { + while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS { + let authority_id = match self.pending_lookups.pop() { + Some(authority) => authority, + None => return, + }; + let hash = hash_authority_id(authority_id.as_ref()); + self.network + .get_value(&hash); + self.in_flight_lookups.insert(hash, authority_id); + + if let Some(metrics) = &self.metrics { + metrics.requests.inc(); + metrics.requests_pending.set( + self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX), + ); + } + } + } + /// Handle incoming Dht events. /// /// Returns either: @@ -385,10 +423,17 @@ where metrics.dht_event_received.with_label_values(&["value_not_found"]).inc(); } - debug!( - target: LOG_TARGET, - "Value for hash '{:?}' not found on Dht.", hash - ) + if self.in_flight_lookups.remove(&hash).is_some() { + debug!( + target: LOG_TARGET, + "Value for hash '{:?}' not found on Dht.", hash + ) + } else { + debug!( + target: LOG_TARGET, + "Received 'ValueNotFound' for unexpected hash '{:?}'.", hash + ) + } }, Some(DhtEvent::ValuePut(hash)) => { if let Some(metrics) = &self.metrics { @@ -434,23 +479,9 @@ where } })?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?; - let authorities = { - let block_id = BlockId::hash(self.client.info().best_hash); - // From the Dht we only get the hashed authority id. In order to retrieve the actual - // authority id and to ensure it is actually an authority, we match the hash against the - // hash of the authority id of all other authorities. - let authorities = self.client.runtime_api().authorities(&block_id)?; - self.addr_cache.retain_ids(&authorities); - authorities - .into_iter() - .map(|id| (hash_authority_id(id.as_ref()), id)) - .collect::>() - }; - - // Check if the event origins from an authority in the current or next authority set. - let authority_id: &AuthorityId = authorities - .get(&remote_key) - .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; + let authority_id: AuthorityId = self.in_flight_lookups + .remove(&remote_key) + .ok_or(Error::ReceivingUnexpectedRecord)?; let local_peer_id = self.network.local_peer_id(); @@ -463,7 +494,7 @@ where let signature = AuthoritySignature::decode(&mut &signature[..]) .map_err(Error::EncodingDecodingScale)?; - if !AuthorityPair::verify(&signature, &addresses, authority_id) { + if !AuthorityPair::verify(&signature, &addresses, &authority_id) { return Err(Error::VerifyingDhtPayload); } @@ -503,7 +534,7 @@ where .collect(); if !remote_addresses.is_empty() { - self.addr_cache.insert(authority_id.clone(), remote_addresses); + self.addr_cache.insert(authority_id, remote_addresses); if let Some(metrics) = &self.metrics { metrics.known_authorities_count.set( self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX) @@ -610,15 +641,15 @@ where } } - // Request addresses of authorities. + // Request addresses of authorities, refilling the pending lookups queue. if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) { // Register waker of underlying task for next interval. while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {} - if let Err(e) = self.request_addresses_of_others() { + if let Err(e) = self.refill_pending_lookups_queue() { error!( target: LOG_TARGET, - "Failed to request addresses of authorities: {:?}", e, + "Failed to refill pending lookups queue: {:?}", e, ); } } @@ -652,6 +683,8 @@ where } } + self.start_new_lookups(); + Poll::Pending } } @@ -712,8 +745,9 @@ fn interval_at(start: Instant, duration: Duration) -> Interval { #[derive(Clone)] pub(crate) struct Metrics { publish: Counter, - amount_last_published: Gauge, - request: Counter, + amount_addresses_last_published: Gauge, + requests: Counter, + requests_pending: Gauge, dht_event_received: CounterVec, handle_value_found_event_failure: Counter, known_authorities_count: Gauge, @@ -730,7 +764,7 @@ impl Metrics { )?, registry, )?, - amount_last_published: register( + amount_addresses_last_published: register( Gauge::new( "authority_discovery_amount_external_addresses_last_published", "Number of external addresses published when authority discovery last \ @@ -738,7 +772,7 @@ impl Metrics { )?, registry, )?, - request: register( + requests: register( Counter::new( "authority_discovery_authority_addresses_requested_total", "Number of times authority discovery has requested external addresses of a \ @@ -746,6 +780,13 @@ impl Metrics { )?, registry, )?, + requests_pending: register( + Gauge::new( + "authority_discovery_authority_address_requests_pending", + "Number of pending authority address requests." + )?, + registry, + )?, dht_event_received: register( CounterVec::new( Opts::new( diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index baa6bd0fc7d62..28192283054d1 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -221,6 +221,41 @@ impl NetworkStateInfo for TestNetwork { } } +fn build_dht_event( + addresses: Vec, + public_key: AuthorityId, + key_store: &BareCryptoStorePtr, +) -> (libp2p::kad::record::Key, Vec) { + let mut serialized_addresses = vec![]; + schema::AuthorityAddresses { + addresses: addresses.into_iter().map(|a| a.to_vec()).collect() + }.encode(&mut serialized_addresses) + .map_err(Error::EncodingProto) + .unwrap(); + + let signature = key_store.read() + .sign_with( + key_types::AUTHORITY_DISCOVERY, + &public_key.clone().into(), + serialized_addresses.as_slice(), + ) + .map_err(|_| Error::Signing) + .unwrap(); + + let mut signed_addresses = vec![]; + schema::SignedAuthorityAddresses { + addresses: serialized_addresses.clone(), + signature, + } + .encode(&mut signed_addresses) + .map_err(Error::EncodingProto) + .unwrap(); + + let key = hash_authority_id(&public_key.to_raw_vec()); + let value = signed_addresses; + (key, value) +} + #[test] fn new_registers_metrics() { let (_dht_event_tx, dht_event_rx) = channel(1000); @@ -247,7 +282,7 @@ fn new_registers_metrics() { } #[test] -fn request_addresses_of_others_triggers_dht_get_query() { +fn triggers_dht_get_query() { let _ = ::env_logger::try_init(); let (_dht_event_tx, dht_event_rx) = channel(1000); @@ -262,7 +297,6 @@ fn request_addresses_of_others_triggers_dht_get_query() { let network: Arc = Arc::new(Default::default()); let key_store = KeyStore::new(); - let (_to_worker, from_service) = mpsc::channel(0); let mut worker = Worker::new( from_service, @@ -274,7 +308,12 @@ fn request_addresses_of_others_triggers_dht_get_query() { None, ); - worker.request_addresses_of_others().unwrap(); + worker.refill_pending_lookups_queue().unwrap(); + + futures::executor::block_on(futures::future::poll_fn(|cx| { + assert_eq!(Poll::Pending, worker.poll_unpin(cx)); + Poll::Ready(()) + })); // Expect authority discovery to request new records from the dht. assert_eq!(network.get_value_call.lock().unwrap().len(), 2); @@ -352,6 +391,9 @@ fn publish_discover_cycle() { dht_event_tx.try_send(dht_event).unwrap(); let f = |cx: &mut Context<'_>| -> Poll<()> { + worker.refill_pending_lookups_queue().unwrap(); + worker.start_new_lookups(); + // Make authority discovery handle the event. if let Poll::Ready(e) = worker.handle_dht_events(cx) { panic!("Unexpected error: {:?}", e); @@ -547,40 +589,11 @@ fn never_add_own_address_to_priority_group() { )) }; - let dht_event = { - let addresses = vec![ - sentry_multiaddr.to_vec(), - random_multiaddr.to_vec(), - ]; - - let mut serialized_addresses = vec![]; - schema::AuthorityAddresses { addresses } - .encode(&mut serialized_addresses) - .map_err(Error::EncodingProto) - .unwrap(); - - let signature = validator_key_store.read() - .sign_with( - key_types::AUTHORITY_DISCOVERY, - &validator_public.clone().into(), - serialized_addresses.as_slice(), - ) - .map_err(|_| Error::Signing) - .unwrap(); - - let mut signed_addresses = vec![]; - schema::SignedAuthorityAddresses { - addresses: serialized_addresses.clone(), - signature, - } - .encode(&mut signed_addresses) - .map_err(Error::EncodingProto) - .unwrap(); - - let key = hash_authority_id(&validator_public.to_raw_vec()); - let value = signed_addresses; - (key, value) - }; + let dht_event = build_dht_event( + vec![sentry_multiaddr, random_multiaddr.clone()], + validator_public.into(), + &validator_key_store, + ); let (_dht_event_tx, dht_event_rx) = channel(1); let sentry_test_api = Arc::new(TestApi { @@ -599,6 +612,9 @@ fn never_add_own_address_to_priority_group() { None, ); + sentry_worker.refill_pending_lookups_queue().unwrap(); + sentry_worker.start_new_lookups(); + sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap(); sentry_worker.set_priority_group().unwrap(); @@ -625,43 +641,19 @@ fn limit_number_of_addresses_added_to_cache_per_authority() { .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) .unwrap(); - let dht_event = { - let addresses = (0..100).map(|_| { - let peer_id = PeerId::random(); - let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); - address.with(multiaddr::Protocol::P2p( - peer_id.into(), - )).to_vec() - }).collect(); - - let mut serialized_addresses = vec![]; - schema::AuthorityAddresses { addresses } - .encode(&mut serialized_addresses) - .map_err(Error::EncodingProto) - .unwrap(); - - let signature = remote_key_store.read() - .sign_with( - key_types::AUTHORITY_DISCOVERY, - &remote_public.clone().into(), - serialized_addresses.as_slice(), - ) - .map_err(|_| Error::Signing) - .unwrap(); - - let mut signed_addresses = vec![]; - schema::SignedAuthorityAddresses { - addresses: serialized_addresses.clone(), - signature, - } - .encode(&mut signed_addresses) - .map_err(Error::EncodingProto) - .unwrap(); + let addresses = (0..100).map(|_| { + let peer_id = PeerId::random(); + let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); + address.with(multiaddr::Protocol::P2p( + peer_id.into(), + )) + }).collect(); - let key = hash_authority_id(&remote_public.to_raw_vec()); - let value = signed_addresses; - (key, value) - }; + let dht_event = build_dht_event( + addresses, + remote_public.into(), + &remote_key_store, + ); let (_dht_event_tx, dht_event_rx) = channel(1); @@ -676,6 +668,9 @@ fn limit_number_of_addresses_added_to_cache_per_authority() { None, ); + worker.refill_pending_lookups_queue().unwrap(); + worker.start_new_lookups(); + worker.handle_dht_value_found_event(vec![dht_event]).unwrap(); assert_eq!( MAX_ADDRESSES_PER_AUTHORITY, @@ -700,40 +695,14 @@ fn do_not_cache_addresses_without_peer_id() { let multiaddr_without_peer_id: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); - let dht_event = { - let addresses = vec![ - multiaddr_with_peer_id.to_vec(), - multiaddr_without_peer_id.to_vec(), - ]; - - let mut serialized_addresses = vec![]; - schema::AuthorityAddresses { addresses } - .encode(&mut serialized_addresses) - .map_err(Error::EncodingProto) - .unwrap(); - - let signature = remote_key_store.read() - .sign_with( - key_types::AUTHORITY_DISCOVERY, - &remote_public.clone().into(), - serialized_addresses.as_slice(), - ) - .map_err(|_| Error::Signing) - .unwrap(); - - let mut signed_addresses = vec![]; - schema::SignedAuthorityAddresses { - addresses: serialized_addresses.clone(), - signature, - } - .encode(&mut signed_addresses) - .map_err(Error::EncodingProto) - .unwrap(); - - let key = hash_authority_id(&remote_public.to_raw_vec()); - let value = signed_addresses; - (key, value) - }; + let dht_event = build_dht_event( + vec![ + multiaddr_with_peer_id.clone(), + multiaddr_without_peer_id, + ], + remote_public.into(), + &remote_key_store, + ); let (_dht_event_tx, dht_event_rx) = channel(1); let local_test_api = Arc::new(TestApi { @@ -754,6 +723,9 @@ fn do_not_cache_addresses_without_peer_id() { None, ); + local_worker.refill_pending_lookups_queue().unwrap(); + local_worker.start_new_lookups(); + local_worker.handle_dht_value_found_event(vec![dht_event]).unwrap(); assert_eq!( @@ -826,3 +798,83 @@ fn addresses_to_publish_respects_existing_p2p_protocol() { "Expected Multiaddr from `TestNetwork` to not be altered.", ); } + +#[test] +fn lookup_throttling() { + let remote_multiaddr = { + let peer_id = PeerId::random(); + let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); + + address.with(multiaddr::Protocol::P2p( + peer_id.into(), + )) + }; + let remote_key_store = KeyStore::new(); + let remote_public_keys: Vec = (0..20).map(|_| { + remote_key_store + .write() + .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) + .unwrap().into() + }).collect(); + let remote_hash_to_key = remote_public_keys.iter() + .map(|k| (hash_authority_id(k.as_ref()), k.clone())) + .collect::>(); + + + let (mut dht_event_tx, dht_event_rx) = channel(1); + let (_to_worker, from_service) = mpsc::channel(0); + let network = Arc::new(TestNetwork::default()); + let mut worker = Worker::new( + from_service, + Arc::new(TestApi { authorities: remote_public_keys.clone() }), + network.clone(), + vec![], + dht_event_rx.boxed(), + Role::Sentry, + None, + ); + + futures::executor::block_on(futures::future::poll_fn(|cx| { + worker.refill_pending_lookups_queue().unwrap(); + + // Assert worker to trigger MAX_IN_FLIGHT_LOOKUPS lookups. + assert_eq!(Poll::Pending, worker.poll_unpin(cx)); + assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS); + assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS); + assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS); + + // Make first lookup succeed. + let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap(); + let remote_key: AuthorityId = remote_hash_to_key.get(&remote_hash).unwrap().clone(); + let dht_event = { + let (key, value) = build_dht_event(vec![remote_multiaddr.clone()], remote_key, &remote_key_store); + sc_network::DhtEvent::ValueFound(vec![(key, value)]) + }; + dht_event_tx.try_send(dht_event).expect("Channel has capacity of 1."); + + // Assert worker to trigger another lookup. + assert_eq!(Poll::Pending, worker.poll_unpin(cx)); + assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS - 1); + assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS); + assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS); + + // Make second one fail. + let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap(); + let dht_event = sc_network::DhtEvent::ValueNotFound(remote_hash); + dht_event_tx.try_send(dht_event).expect("Channel has capacity of 1."); + + // Assert worker to trigger another lookup. + assert_eq!(Poll::Pending, worker.poll_unpin(cx)); + assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS - 2); + assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS); + assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS); + + worker.refill_pending_lookups_queue().unwrap(); + + // Assert worker to restock pending lookups and forget about in-flight lookups. + assert_eq!(worker.pending_lookups.len(), remote_public_keys.len()); + assert_eq!(worker.in_flight_lookups.len(), 0); + + Poll::Ready(()) + })); +}