From 5d7f6d5984a849095d3c7759ba3b4c73b00264ab Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 27 Jun 2020 14:58:07 -0400 Subject: [PATCH 01/26] set up data types and control flow for statement distribution --- Cargo.lock | 18 ++ Cargo.toml | 1 + .../network/statement-distribution/Cargo.toml | 22 +++ .../network/statement-distribution/src/lib.rs | 165 ++++++++++++++++++ node/primitives/src/lib.rs | 23 +-- node/subsystem/src/messages.rs | 5 +- primitives/src/parachain.rs | 2 +- 7 files changed, 224 insertions(+), 12 deletions(-) create mode 100644 node/network/statement-distribution/Cargo.toml create mode 100644 node/network/statement-distribution/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 946344001940..9ece1cfbd144 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4785,6 +4785,24 @@ dependencies = [ "westend-runtime", ] +[[package]] +name = "polkadot-statement-distribution" +version = "0.1.0" +dependencies = [ + "assert_matches", + "futures 0.3.5", + "futures-timer 3.0.2", + "log 0.4.8", + "parity-scale-codec", + "parking_lot 0.10.2", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-primitives", + "polkadot-subsystem-test-helpers", + "sp-runtime", + "streamunordered", +] + [[package]] name = "polkadot-statement-table" version = "0.8.13" diff --git a/Cargo.toml b/Cargo.toml index 4a72ae8dc6f9..28f57c07e5c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "validation", "node/network/bridge", + "node/network/statement-distribution", "node/overseer", "node/primitives", "node/service", diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml new file mode 100644 index 000000000000..7a780a8eb8ff --- /dev/null +++ b/node/network/statement-distribution/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "polkadot-statement-distribution" +version = "0.1.0" +authors = ["Parity Technologies "] +description = "Statement Distribution Subsystem" +edition = "2018" + +[dependencies] +futures = "0.3.5" +log = "0.4.8" +futures-timer = "3.0.2" +streamunordered = "0.5.1" +polkadot-primitives = { path = "../../../primitives" } +node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } +parity-scale-codec = "1.3.0" +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } + +[dev-dependencies] +parking_lot = "0.10.0" +subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } +assert_matches = "1.3.0" diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs new file mode 100644 index 000000000000..422b4203bbc7 --- /dev/null +++ b/node/network/statement-distribution/src/lib.rs @@ -0,0 +1,165 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The Statement Distribution Subsystem. +//! +//! This is responsible for distributing signed statements about candidate +//! validity amongst validators. + +use polkadot_subsystem::{ + Subsystem, SubsystemResult, SubsystemError, SubsystemContext, SpawnedSubsystem, + FromOverseer, OverseerSignal, +}; +use polkadot_subsystem::messages::{ + AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage, + PeerId, ObservedRole, ReputationChange as Rep, +}; +use node_primitives::{ProtocolId, View, SignedFullStatement}; +use polkadot_primitives::Hash; +use polkadot_primitives::parachain::{CompactStatement, ValidatorIndex}; + +use futures::prelude::*; + +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet, BTreeSet}; + +const PROTOCOL_V1: ProtocolId = *b"sdn1"; + +const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement"); +const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature"); + +const BENEFIT_VALID_STATEMENT: Rep = Rep::new(25, "Peer provided a valid statement"); + +/// The statement distribution subsystem. +pub struct StatementDistribution; + +impl Subsystem for StatementDistribution + where C: SubsystemContext +{ + fn start(self, ctx: C) -> SpawnedSubsystem { + // Swallow error because failure is fatal to the node and we log with more precision + // within `run`. + SpawnedSubsystem(run(ctx).map(|_| ()).boxed()) + } +} + +fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { + AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(n)) +} + +// knowledge that a peer has about goings-on in a relay parent. +struct PeerRelayParentKnowledge { + // candidates that a peer is aware of. This indicates that we can send + // statements pertaining to that candidate. + known_candidates: HashSet, + // fingerprints of all statements a peer should be aware of: those that + // were sent to us by the peer or sent to the peer by us. + known_statements: HashSet<(CompactStatement, ValidatorIndex)>, +} + +struct PeerData { + role: ObservedRole, + view: View, + view_knowledge: HashMap, +} + +// A statement stored while a relay chain head is active. +// +// These are orderable first by (Seconded, Valid, Invalid), then by the underlying hash, +// and lastly by the signing validator's index. +#[derive(PartialEq, Eq)] +struct StoredStatement { + compact: CompactStatement, + statement: SignedFullStatement, +} + +impl StoredStatement { + fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) { + (self.compact.clone(), self.statement.validator_index()) + } +} + +impl PartialOrd for StoredStatement { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for StoredStatement { + fn cmp(&self, other: &Self) -> Ordering { + let to_idx = |x: &CompactStatement| match *x { + CompactStatement::Candidate(_) => 0u8, + CompactStatement::Valid(_) => 1, + CompactStatement::Invalid(_) => 2, + }; + + match (&self.compact, &other.compact) { + (&CompactStatement::Candidate(ref h), &CompactStatement::Candidate(ref h2)) | + (&CompactStatement::Valid(ref h), &CompactStatement::Valid(ref h2)) | + (&CompactStatement::Invalid(ref h), &CompactStatement::Invalid(ref h2)) => { + h.cmp(h2).then( + self.statement.validator_index().cmp(&other.statement.validator_index()) + ) + }, + + (ref a, ref b) => to_idx(a).cmp(&to_idx(b)), + } + } +} + +struct ActiveHeadData { + // All candidates we are aware of for this head, keyed by hash. + candidates: HashSet, + // Stored statements for circulation to peers. + statements: BTreeSet, +} + +async fn run( + mut ctx: impl SubsystemContext, +) -> SubsystemResult<()> { + // startup: register the network protocol with the bridge. + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::RegisterEventProducer( + PROTOCOL_V1, + network_update_message, + ))).await?; + + let mut peers: HashMap = HashMap::new(); + let mut our_view = View::default(); + let mut active_heads: HashMap = HashMap::new(); + + loop { + let message = ctx.recv().await?; + match message { + FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { + + } + FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { + + } + FromOverseer::Signal(OverseerSignal::Conclude) => break, + FromOverseer::Communication { msg } => match msg { + StatementDistributionMessage::Share(relay_parent, statement) => { + // place into `active_heads` and circulate to all peers with + // the head in their view. + } + StatementDistributionMessage::NetworkBridgeUpdate(event) => match event { + _ => unimplemented!(), + } + } + } + } + Ok(()) +} diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 527e6aaea274..b1a6f9fb0973 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -29,7 +29,7 @@ use polkadot_primitives::{Hash, }; /// A statement, where the candidate receipt is included in the `Seconded` variant. -#[derive(Debug, Clone, PartialEq, Encode, Decode)] +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub enum Statement { /// A statement that a validator seconds a candidate. #[codec(index = "1")] @@ -42,16 +42,19 @@ pub enum Statement { Invalid(Hash), } +impl Statement { + pub fn to_compact(&self) -> CompactStatement { + match *self { + Statement::Seconded(ref c) => CompactStatement::Candidate(c.hash()), + Statement::Valid(hash) => CompactStatement::Valid(hash), + Statement::Invalid(hash) => CompactStatement::Invalid(hash), + } + } +} + impl EncodeAs for Statement { fn encode_as(&self) -> Vec { - let statement = match *self { - Statement::Seconded(ref c) => { - polkadot_primitives::parachain::CompactStatement::Candidate(c.hash()) - } - Statement::Valid(hash) => polkadot_primitives::parachain::CompactStatement::Valid(hash), - Statement::Invalid(hash) => polkadot_primitives::parachain::CompactStatement::Invalid(hash), - }; - statement.encode() + self.to_compact().encode() } } @@ -87,5 +90,5 @@ pub type ProtocolId = [u8; 4]; /// A succinct representation of a peer's view. This consists of a bounded amount of chain heads. /// /// Up to `N` (5?) chain heads. -#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct View(pub Vec); diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index c22581349078..12aec55c5b84 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -24,7 +24,6 @@ use futures::channel::{mpsc, oneshot}; -use sc_network::{ObservedRole, ReputationChange, PeerId}; use polkadot_primitives::{BlockNumber, Hash, Signature}; use polkadot_primitives::parachain::{ AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId, @@ -34,6 +33,8 @@ use polkadot_node_primitives::{ MisbehaviorReport, SignedFullStatement, View, ProtocolId, }; +pub use sc_network::{ObservedRole, ReputationChange, PeerId}; + /// A notification of a new backed candidate. #[derive(Debug)] pub struct NewBackedCandidate(pub BackedCandidate); @@ -223,4 +224,6 @@ pub enum AllMessages { RuntimeApi(RuntimeApiMessage), /// Message for the availability store subsystem. AvailabilityStore(AvailabilityStoreMessage), + /// Message for the network bridge subsystem. + NetworkBridge(NetworkBridgeMessage), } diff --git a/primitives/src/parachain.rs b/primitives/src/parachain.rs index 52306174be5d..427c64bfd48d 100644 --- a/primitives/src/parachain.rs +++ b/primitives/src/parachain.rs @@ -590,7 +590,7 @@ pub struct Activity(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec Date: Sat, 27 Jun 2020 15:03:32 -0400 Subject: [PATCH 02/26] add some set-like methods to View --- node/primitives/src/lib.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index b1a6f9fb0973..173a9b7d920a 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -92,3 +92,20 @@ pub type ProtocolId = [u8; 4]; /// Up to `N` (5?) chain heads. #[derive(Default, Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct View(pub Vec); + +impl View { + /// Returns an iterator of the hashes present in `Self` but not in `other`. + pub fn difference<'a>(&'a self, other: &'a View) -> impl Iterator + 'a { + self.0.iter().cloned().filter(move |h| !other.contains(h)) + } + + /// An iterator containing hashes present in both `Self` and in `other`. + pub fn intersection<'a>(&'a self, other: &'a View) -> impl Iterator + 'a { + self.0.iter().cloned().filter(move |h| other.contains(h)) + } + + /// Whether the view contains a given hash. + pub fn contains(&self, hash: &Hash) -> bool { + self.0.contains(hash) + } +} From f8992a2327a994ea55229ccefaef5f3fecc6391a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 27 Jun 2020 15:43:16 -0400 Subject: [PATCH 03/26] implement sending to peers --- .../network/statement-distribution/src/lib.rs | 191 ++++++++++++++---- 1 file changed, 156 insertions(+), 35 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 422b4203bbc7..b9fc0eeb2879 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -30,11 +30,11 @@ use polkadot_subsystem::messages::{ use node_primitives::{ProtocolId, View, SignedFullStatement}; use polkadot_primitives::Hash; use polkadot_primitives::parachain::{CompactStatement, ValidatorIndex}; +use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; -use std::cmp::Ordering; -use std::collections::{HashMap, HashSet, BTreeSet}; +use std::collections::{HashMap, HashSet}; const PROTOCOL_V1: ProtocolId = *b"sdn1"; @@ -70,12 +70,60 @@ struct PeerRelayParentKnowledge { known_statements: HashSet<(CompactStatement, ValidatorIndex)>, } +impl PeerRelayParentKnowledge { + /// Attempt to update our view of the peer's knowledge with this statement's fingerprint. + /// + /// This returns `false` if the peer cannot accept this statement, without altering internal + /// state. + /// + /// If the peer can accept the statement, this returns `true` and updates the internal state. + /// Once the knowledge has incorporated a statement, it cannot be incorporated again. + fn accept(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool { + if self.known_statements.contains(fingerprint) { + return false; + } + + match fingerprint.0 { + CompactStatement::Candidate(ref h) => { + self.known_candidates.insert(h.clone()); + }, + CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => { + // The peer can only accept Valid and Invalid statements for which it is aware + // of the corresponding candidate. + if self.known_candidates.contains(h) { + return false; + } + } + } + + self.known_statements.insert(fingerprint.clone()); + true + } +} + struct PeerData { role: ObservedRole, view: View, view_knowledge: HashMap, } +impl PeerData { + /// Attempt to update our view of the peer's knowledge with this statement's fingerprint. + /// + /// This returns `false` if the peer cannot accept this statement, without altering internal + /// state. + /// + /// If the peer can accept the statement, this returns `true` and updates the internal state. + /// Once the knowledge has incorporated a statement, it cannot be incorporated again. + fn accept( + &mut self, + relay_parent: &Hash, + fingerprint: &(CompactStatement, ValidatorIndex), + ) -> bool { + self.view_knowledge.get_mut(relay_parent).map_or(false, |k| k.accept(fingerprint)) + } +} + // A statement stored while a relay chain head is active. // // These are orderable first by (Seconded, Valid, Invalid), then by the underlying hash, @@ -92,39 +140,102 @@ impl StoredStatement { } } -impl PartialOrd for StoredStatement { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) +impl std::borrow::Borrow for StoredStatement { + fn borrow(&self) -> &CompactStatement { + &self.compact } } -impl Ord for StoredStatement { - fn cmp(&self, other: &Self) -> Ordering { - let to_idx = |x: &CompactStatement| match *x { - CompactStatement::Candidate(_) => 0u8, - CompactStatement::Valid(_) => 1, - CompactStatement::Invalid(_) => 2, - }; - - match (&self.compact, &other.compact) { - (&CompactStatement::Candidate(ref h), &CompactStatement::Candidate(ref h2)) | - (&CompactStatement::Valid(ref h), &CompactStatement::Valid(ref h2)) | - (&CompactStatement::Invalid(ref h), &CompactStatement::Invalid(ref h2)) => { - h.cmp(h2).then( - self.statement.validator_index().cmp(&other.statement.validator_index()) - ) - }, - - (ref a, ref b) => to_idx(a).cmp(&to_idx(b)), - } +impl std::hash::Hash for StoredStatement { + fn hash(&self, state: &mut H) { + self.fingerprint().hash(state) } } struct ActiveHeadData { // All candidates we are aware of for this head, keyed by hash. candidates: HashSet, - // Stored statements for circulation to peers. - statements: BTreeSet, + // Stored seconded statements for circulation to peers. + seconded_statements: HashSet, + // Stored other statements for circulation to peers. + other_statements: HashSet, +} + +impl ActiveHeadData { + /// Note the given statement. If it was not already known, returns `Some`, with a handle to the + /// statement. + fn note_statement(&mut self, statement: SignedFullStatement) -> Option<&StoredStatement> { + let compact = statement.payload().to_compact(); + let stored = StoredStatement { + compact: compact.clone(), + statement, + }; + + match compact { + CompactStatement::Candidate(h) => { + self.candidates.insert(h); + if self.seconded_statements.insert(stored) { + // This will always return `Some` because it was just inserted. + self.seconded_statements.get(&compact) + } else { + None + } + } + CompactStatement::Valid(_) | CompactStatement::Invalid(_) => { + if self.other_statements.insert(stored) { + // This will always return `Some` because it was just inserted. + self.other_statements.get(&compact) + } else { + None + } + } + } + } + + /// Get an iterator over all statements for the active head. Seconded statements come first. + fn statements(&self) -> impl Iterator + '_ { + self.seconded_statements.iter().chain(self.other_statements.iter()) + } +} + +async fn share_message( + peers: &mut HashMap, + active_heads: &mut HashMap, + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + statement: SignedFullStatement, +) -> SubsystemResult<()> { + if let Some(stored) + = active_heads.get_mut(&relay_parent).and_then(|d| d.note_statement(statement)) + { + let fingerprint = stored.fingerprint(); + let peers_to_send: Vec<_> = peers.iter_mut() + .filter_map(|(p, data)| if data.accept(&relay_parent, &fingerprint) { + Some(p.clone()) + } else { + None + }) + .collect(); + + if peers_to_send.is_empty() { return Ok(()) } + + let payload = stored.statement.encode(); + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + peers_to_send, + PROTOCOL_V1, + payload, + ))).await?; + } + Ok(()) +} + +async fn handle_network_update( + peers: &mut HashMap, + active_heads: &mut HashMap, + ctx: &mut impl SubsystemContext, + update: NetworkBridgeEvent, +) -> SubsystemResult<()> { + Ok(()) } async fn run( @@ -144,20 +255,30 @@ async fn run( let message = ctx.recv().await?; match message { FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { - + active_heads.entry(relay_parent).or_insert(ActiveHeadData { + candidates: HashSet::new(), + seconded_statements: HashSet::new(), + other_statements: HashSet::new(), + }); } FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { - + // do nothing - we will handle this when our view changes. } FromOverseer::Signal(OverseerSignal::Conclude) => break, FromOverseer::Communication { msg } => match msg { - StatementDistributionMessage::Share(relay_parent, statement) => { - // place into `active_heads` and circulate to all peers with - // the head in their view. - } - StatementDistributionMessage::NetworkBridgeUpdate(event) => match event { - _ => unimplemented!(), - } + StatementDistributionMessage::Share(relay_parent, statement) => share_message( + &mut peers, + &mut active_heads, + &mut ctx, + relay_parent, + statement, + ).await?, + StatementDistributionMessage::NetworkBridgeUpdate(event) => handle_network_update( + &mut peers, + &mut active_heads, + &mut ctx, + event, + ).await?, } } } From 72b339b634bf71f6d9081ec8bf27541708a2b9f6 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 29 Jun 2020 19:00:12 -0400 Subject: [PATCH 04/26] start fixing equivocation handling --- .../network/statement-distribution/Cargo.toml | 1 + .../network/statement-distribution/src/lib.rs | 238 +++++++++++++++--- 2 files changed, 209 insertions(+), 30 deletions(-) diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index 7a780a8eb8ff..dc83d15e8b2c 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -14,6 +14,7 @@ polkadot-primitives = { path = "../../../primitives" } node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } parity-scale-codec = "1.3.0" sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } [dev-dependencies] diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index b9fc0eeb2879..7dedbda0342f 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -29,7 +29,9 @@ use polkadot_subsystem::messages::{ }; use node_primitives::{ProtocolId, View, SignedFullStatement}; use polkadot_primitives::Hash; -use polkadot_primitives::parachain::{CompactStatement, ValidatorIndex}; +use polkadot_primitives::parachain::{ + CompactStatement, ValidatorIndex, ValidatorId, SigningContext, +}; use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; @@ -40,6 +42,7 @@ const PROTOCOL_V1: ProtocolId = *b"sdn1"; const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement"); const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature"); +const COST_INVALID_MESSAGE: Rep = Rep::new(-500, "Invalid message"); const BENEFIT_VALID_STATEMENT: Rep = Rep::new(25, "Peer provided a valid statement"); @@ -68,6 +71,8 @@ struct PeerRelayParentKnowledge { // fingerprints of all statements a peer should be aware of: those that // were sent to us by the peer or sent to the peer by us. known_statements: HashSet<(CompactStatement, ValidatorIndex)>, + // How many candidates this peer is aware of for each given validator index. + seconded_counts: HashMap, } impl PeerRelayParentKnowledge { @@ -85,6 +90,14 @@ impl PeerRelayParentKnowledge { match fingerprint.0 { CompactStatement::Candidate(ref h) => { + // Each peer is allowed to be aware of two seconded statements per validator per + // relay-parent hash. + let count = self.seconded_counts.entry(fingerprint.1).or_insert(0); + if *count >= 2 { + return false; + } + *count += 1; + self.known_candidates.insert(h.clone()); }, CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => { @@ -153,17 +166,38 @@ impl std::hash::Hash for StoredStatement { } struct ActiveHeadData { - // All candidates we are aware of for this head, keyed by hash. + /// All candidates we are aware of for this head, keyed by hash. candidates: HashSet, - // Stored seconded statements for circulation to peers. + /// Stored seconded statements for circulation to peers. seconded_statements: HashSet, - // Stored other statements for circulation to peers. + /// Stored other statements for circulation to peers. other_statements: HashSet, + /// The validators at this head. + validators: Vec, + /// The session index this head is at. + session_index: sp_staking::SessionIndex, } impl ActiveHeadData { - /// Note the given statement. If it was not already known, returns `Some`, with a handle to the - /// statement. + fn new(validators: Vec, session_index: sp_staking::SessionIndex) -> Self { + ActiveHeadData { + candidates: Default::default(), + seconded_statements: Default::default(), + other_statements: Default::default(), + validators, + session_index, + } + } + + /// Note the given statement. + /// + /// If it was not already known and can be accepted, returns `Some`, + /// with a handle to the statement. + /// + /// `Seconded` statements are always accepted, and are assumed to have passed flood-mitigation + /// measures before reaching this point. + /// + /// Other statements that reference a candidate we are not aware of cannot be accepted. fn note_statement(&mut self, statement: SignedFullStatement) -> Option<&StoredStatement> { let compact = statement.payload().to_compact(); let stored = StoredStatement { @@ -181,7 +215,11 @@ impl ActiveHeadData { None } } - CompactStatement::Valid(_) | CompactStatement::Invalid(_) => { + CompactStatement::Valid(h) | CompactStatement::Invalid(h) => { + if !self.candidates.contains(&h) { + return None; + } + if self.other_statements.insert(stored) { // This will always return `Some` because it was just inserted. self.other_statements.get(&compact) @@ -198,6 +236,29 @@ impl ActiveHeadData { } } +/// Check a statement signature under this parent hash. +fn check_statement_signature( + head: &ActiveHeadData, + relay_parent: Hash, + statement: &SignedFullStatement, +) -> Result<(), ()> { + let signing_context = SigningContext { + session_index: head.session_index, + parent_hash: relay_parent, + }; + + match head.validators.get(statement.validator_index() as usize) { + None => return Err(()), + Some(v) => statement.check_signature(&signing_context, v), + } +} + +#[derive(Encode, Decode)] +enum WireMessage { + /// relay-parent, full statement. + Statement(Hash, SignedFullStatement), +} + async fn share_message( peers: &mut HashMap, active_heads: &mut HashMap, @@ -208,34 +269,153 @@ async fn share_message( if let Some(stored) = active_heads.get_mut(&relay_parent).and_then(|d| d.note_statement(statement)) { - let fingerprint = stored.fingerprint(); - let peers_to_send: Vec<_> = peers.iter_mut() - .filter_map(|(p, data)| if data.accept(&relay_parent, &fingerprint) { - Some(p.clone()) - } else { - None - }) - .collect(); - - if peers_to_send.is_empty() { return Ok(()) } - - let payload = stored.statement.encode(); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( - peers_to_send, - PROTOCOL_V1, - payload, - ))).await?; + send_stored_to_peers(peers, ctx, relay_parent, stored).await?; } Ok(()) } +async fn send_stored_to_peers( + peers: &mut HashMap, + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + stored: &StoredStatement, +) -> SubsystemResult<()> { + let fingerprint = stored.fingerprint(); + let peers_to_send: Vec<_> = peers.iter_mut() + .filter_map(|(p, data)| if data.accept(&relay_parent, &fingerprint) { + Some(p.clone()) + } else { + None + }) + .collect(); + + if peers_to_send.is_empty() { return Ok(()) } + + let payload = WireMessage::Statement(relay_parent, stored.statement.clone()).encode(); + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + peers_to_send, + PROTOCOL_V1, + payload, + ))).await?; + + Ok(()) +} + +async fn report_peer( + ctx: &mut impl SubsystemContext, + peer: PeerId, + rep: Rep, +) -> SubsystemResult<()> { + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + )).await +} + +// Handle an incoming wire message. Returns a reference to a newly-stored statement +// if we were not already aware of it, along with the corresponding relay-parent. +// +// This function checks the signature and ensures the statement is compatible with our +// view. +async fn handle_incoming_message<'a>( + peer: PeerId, + peer_data: &mut PeerData, + our_view: &View, + active_heads: &'a mut HashMap, + ctx: &mut impl SubsystemContext, + message: Vec, +) -> SubsystemResult> { + let (relay_parent, statement) = match WireMessage::decode(&mut &message[..]) { + Err(_) => return report_peer(ctx, peer, COST_INVALID_MESSAGE).await.map(|_| None), + Ok(WireMessage::Statement(r, s)) => (r, s), + }; + + if !our_view.contains(relay_parent) { + return report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await.map(|_| None); + } + + let active_head = match active_heads.get_mut(relay_parent) { + Some(h) => h, + None => { + // This should never be out-of-sync with our view if the view updates + // correspond to actual `StartWork` messages. So we just log and ignore. + log::warn!("Our view out-of-sync with active heads. Head {} not found", relay_parent); + return Ok(None); + } + }; + + // check the signature on the statement. + if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) { + return report_peer(ctx, peer, COST_INVALID_SIGNATURE).await.map(|_| None); + } + + // Ensure the statement is stored in the peer data. + // + // Note that if the peer is sending us something that is not within their view, + // it will not be kept within their log. + let fingerprint = (statement.payload().to_compact(), statement.validator_index()); + if !peer_data.accept(&relay_parent, fingerprint) { + // If the peer was already aware of this statement or it was not within their own view, + // then we note the peer as being costly. + // + // This can race if we send the message to the peer at exactly the same time, + // but the report cost is relatively low, and the expected amounts of reports due to + // that race is also low. + // + // Regardless, this serves as a deterrent for peers to avoid spamming us with the same + // message over and over again, as we will disconnect from such peers. + report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await?; + } + + // Note: `peer_data.accept` already ensures that the statement is not an unbounded equivocation + // or unpinned to a seconded candidate. So it is safe to place it into the storage. + Ok(active_head.note_statement(statement).map(|s| (relay_parent, s))) +} + async fn handle_network_update( peers: &mut HashMap, active_heads: &mut HashMap, ctx: &mut impl SubsystemContext, + our_view: &mut View, update: NetworkBridgeEvent, ) -> SubsystemResult<()> { - Ok(()) + match update { + NetworkBridgeEvent::PeerConnected(peer, role) => { + peers.insert(peer, PeerData { + role, + view: Default::default(), + view_knowledge: Default::default(), + }); + + Ok(()) + } + NetworkBridgeEvent::PeerDisconnected(peer) => { + peers.remove(&peer); + Ok(()) + } + NetworkBridgeEvent::PeerMessage(peer, message) => { + match peers.get_mut(&peer) { + Some(data) => handle_incoming_message( + peer, + data, + &*our_view, + active_heads, + ctx, + message, + ).await?, + None => Ok(()), + } + + } + NetworkBridgeEvent::PeerViewChange(peer, view) => { + // 1. Update the view. + // 2. Send this peer all messages that we have for new active heads in the view. + } + NetworkBridgeEvent::OurViewChange(view) => { + // 1. Update our view. + // 2. Clean up everything that is not in the new view. + } + } + } async fn run( @@ -255,11 +435,8 @@ async fn run( let message = ctx.recv().await?; match message { FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { - active_heads.entry(relay_parent).or_insert(ActiveHeadData { - candidates: HashSet::new(), - seconded_statements: HashSet::new(), - other_statements: HashSet::new(), - }); + active_heads.entry(relay_parent) + .or_insert(ActiveHeadData::new(unimplemented!(), unimplemented!())); } FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { // do nothing - we will handle this when our view changes. @@ -277,6 +454,7 @@ async fn run( &mut peers, &mut active_heads, &mut ctx, + &mut our_view, event, ).await?, } From 6a16cd7328a6a0cf1916de57487be3a79ebe7ecb Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 29 Jun 2020 20:18:27 -0400 Subject: [PATCH 05/26] Add a section to the statement distribution subsystem on equivocations and flood protection --- .../node/backing/statement-distribution.md | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/roadmap/implementors-guide/src/node/backing/statement-distribution.md b/roadmap/implementors-guide/src/node/backing/statement-distribution.md index 59e5244d0d76..3d35c6ec20fe 100644 --- a/roadmap/implementors-guide/src/node/backing/statement-distribution.md +++ b/roadmap/implementors-guide/src/node/backing/statement-distribution.md @@ -24,14 +24,14 @@ Statement Distribution is the only backing subsystem which has any notion of pee It is responsible for distributing signed statements that we have generated and forwarding them, and for detecting a variety of Validator misbehaviors for reporting to [Misbehavior Arbitration](../utility/misbehavior-arbitration.md). During the Backing stage of the inclusion pipeline, it's the main point of contact with peer nodes. On receiving a signed statement from a peer, assuming the peer receipt state machine is in an appropriate state, it sends the Candidate Receipt to the [Candidate Backing subsystem](candidate-backing.md) to handle the validator's statement. -Track equivocating validators and stop accepting information from them. Forward double-vote proofs to the double-vote reporting system. Establish a data-dependency order: +Track equivocating validators and stop accepting information from them. Establish a data-dependency order: - In order to receive a `Seconded` message we have the on corresponding chain head in our view - In order to receive an `Invalid` or `Valid` message we must have received the corresponding `Seconded` message. And respect this data-dependency order from our peers by respecting their views. This subsystem is responsible for checking message signatures. -The Statement Distribution subsystem sends statements to peer nodes and detects double-voting by validators. When validators conflict with each other or themselves, the Misbehavior Arbitration system is notified. +The Statement Distribution subsystem sends statements to peer nodes. ## Peer Receipt State Machine @@ -53,4 +53,21 @@ This system implies a certain level of duplication of messages--we received X's And respect this data-dependency order from our peers. This subsystem is responsible for checking message signatures. -No jobs, `StartWork` and `StopWork` pulses are used to control neighbor packets and what we are currently accepting. +No jobs. We follow view changes from the [`NetworkBridge`](../utility/network-bridge.md), which in turn is updated by the overseer. + +## Equivocations and Flood Protection + +An equivocation is a double-vote by a validator. The [Candidate Backing](candidate-backing.md) Subsystem is better-suited than this one to detect equivocations as it adds votes to quorum trackers. + +At this level, we are primarily concerned about flood-protection, and to some extent, detecting equivocations is a part of that. In particular, we are interested in detecting equivocations of `Seconded` statements. Since every other statement is dependent on `Seconded` statements, ensuring that we only ever hold a bounded number of `Seconded` statements is sufficient for flood-protection. + +The simple approach is to say that we only receive up to two `Seconded` statements per validator per chain head. However, the marginal cost of equivocation, conditional on having already equivocated, is close to 0, since a single double-vote offence is counted as all double-vote offences for a particular chain-head. Even if it were not, there is some amount of equivocations that can be done such that the marginal cost of issuing further equivocations is close to 0, as there would be an amount of equivocations necessary to be completely and totally obliterated by the slashing algorithm. We fear the validator with nothing left to lose. + +With that in mind, this simple approach has a caveat worth digging deeper into. + +First: We may be aware of two equivocated `Seconded` statements issued by a validator. A totally honest peer of ours can also be aware of one or two different `Seconded` statements issued by the same validator. And yet another peer may be aware of one or two _more_ `Seconded` statements. And so on. This interacts badly with pre-emptive sending logic. Upon sending a `Seconded` statement to a peer, we will want to pre-emptively follow up with all statements relative to that candidate. Waiting for acknowledgement introduces latency at every hop, so that is best avoided. What can happen is that upon receipt of the `Seconded` statement, the peer will discard it as it falls beyond the bound of 2 that it is allowed to store. It cannot store anything in memory about discarded candidates as that would introduce a DoS vector. Then, the peer would receive from us all of the statements pertaining to that candidate, which, from its perspective, would be undesired - they are data-dependent on the `Seconded` statement we sent them, but they have erased all record of that from their memory. Upon receiving a potential flood of undesired statements, this 100% honest peer may choose to disconnect from us. In this way, an adversary may be able to partition the network with careful distribution of equivocated `Seconded` statements. + +The fix is to track, per-peer, the hashes of up to 4 candidates per validator (per relay-parent) that the peer is aware of. It is 4 because we may send them 2 and they may send us 2 different ones. We track the data that they are aware of as the union of things we have sent them and things they have sent us. If we receive a 1st or 2nd `Seconded` statement from a peer, we note it in the peer's known candidates even if we do disregard the data locally. And then, upon receipt of any data dependent on that statement, we do not reduce that peer's standing in our eyes, as the data was not undesired. + +There is another caveat to the fix: we don't want to allow the peer to flood us because it has set things up in a way that it knows we will drop all of its traffic. +We also track how many statements we have received per peer, per candidate, and per chain-head. This is any statement concerning a particular statement: `Seconded`, `Valid`, or `Invalid. If we ever receive a statement from a peer which would push any of these counters beyond twice the amount of validators at the chain-head, we begin to lower the peer's standing and eventually disconnect. This bound is a massive overestimate and could be reduced to twice the number of validators in the corresponding validator group. It is worth noting that the goal at the time of writing is to ensure any finite bound on the amount of stored data, as any equivocation results in a large slash. From ca3b523fd223f0570375b8076d104e80c1b981a7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 30 Jun 2020 11:20:11 -0400 Subject: [PATCH 06/26] fix typo and amend wording --- .../src/node/backing/statement-distribution.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roadmap/implementors-guide/src/node/backing/statement-distribution.md b/roadmap/implementors-guide/src/node/backing/statement-distribution.md index 3d35c6ec20fe..d05c68f7af70 100644 --- a/roadmap/implementors-guide/src/node/backing/statement-distribution.md +++ b/roadmap/implementors-guide/src/node/backing/statement-distribution.md @@ -70,4 +70,4 @@ First: We may be aware of two equivocated `Seconded` statements issued by a vali The fix is to track, per-peer, the hashes of up to 4 candidates per validator (per relay-parent) that the peer is aware of. It is 4 because we may send them 2 and they may send us 2 different ones. We track the data that they are aware of as the union of things we have sent them and things they have sent us. If we receive a 1st or 2nd `Seconded` statement from a peer, we note it in the peer's known candidates even if we do disregard the data locally. And then, upon receipt of any data dependent on that statement, we do not reduce that peer's standing in our eyes, as the data was not undesired. There is another caveat to the fix: we don't want to allow the peer to flood us because it has set things up in a way that it knows we will drop all of its traffic. -We also track how many statements we have received per peer, per candidate, and per chain-head. This is any statement concerning a particular statement: `Seconded`, `Valid`, or `Invalid. If we ever receive a statement from a peer which would push any of these counters beyond twice the amount of validators at the chain-head, we begin to lower the peer's standing and eventually disconnect. This bound is a massive overestimate and could be reduced to twice the number of validators in the corresponding validator group. It is worth noting that the goal at the time of writing is to ensure any finite bound on the amount of stored data, as any equivocation results in a large slash. +We also track how many statements we have received per peer, per candidate, and per chain-head. This is any statement concerning a particular candidate: `Seconded`, `Valid`, or `Invalid`. If we ever receive a statement from a peer which would push any of these counters beyond twice the amount of validators at the chain-head, we begin to lower the peer's standing and eventually disconnect. This bound is a massive overestimate and could be reduced to twice the number of validators in the corresponding validator group. It is worth noting that the goal at the time of writing is to ensure any finite bound on the amount of stored data, as any equivocation results in a large slash. From 84646869e374adc50e87fe822c5e22e7cd87dda8 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 30 Jun 2020 23:41:57 -0400 Subject: [PATCH 07/26] implement flood protection --- Cargo.lock | 2 + .../network/statement-distribution/Cargo.toml | 1 + .../network/statement-distribution/src/lib.rs | 278 +++++++++++++++--- 3 files changed, 235 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ece1cfbd144..b7aea203110b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4789,6 +4789,7 @@ dependencies = [ name = "polkadot-statement-distribution" version = "0.1.0" dependencies = [ + "arrayvec 0.5.1", "assert_matches", "futures 0.3.5", "futures-timer 3.0.2", @@ -4800,6 +4801,7 @@ dependencies = [ "polkadot-primitives", "polkadot-subsystem-test-helpers", "sp-runtime", + "sp-staking", "streamunordered", ] diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index dc83d15e8b2c..42b26d460430 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -16,6 +16,7 @@ parity-scale-codec = "1.3.0" sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +arrayvec = "0.5.1" [dev-dependencies] parking_lot = "0.10.0" diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 7dedbda0342f..0c7893460b53 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -43,9 +43,28 @@ const PROTOCOL_V1: ProtocolId = *b"sdn1"; const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement"); const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature"); const COST_INVALID_MESSAGE: Rep = Rep::new(-500, "Invalid message"); +const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer"); +const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements"); const BENEFIT_VALID_STATEMENT: Rep = Rep::new(25, "Peer provided a valid statement"); +/// The maximum amount of candidates each validator is allowed to second at any relay-parent. +/// Short for "Validator Candidate Threshold". +/// +/// This is the amount of candidates we keep per validator at any relay-parent. +/// Typically we will only keep 1, but when a validator equivocates we will need to track 2. +const VC_THRESHOLD: usize = 2; + +/// The maximum amount of candidates each peer can be aware of each validator seconding at +/// any relay-parent. Short for "Validator Candidate per Peer Threshold". +/// +/// This is 2 times the `VC_THRESHOLD` because it includes the candidates in +/// our state that we may have sent them, and the candidates that they may have received from +/// other peers in the meantime. Peers are unlikely to ever be aware of more than 2 candidates +/// except in the case of a targeted attack by a sophisticated adversary. Nevertheless, we +/// establish a finite bound on memory used in such a situation. +const VC_PEER_THRESHOLD: usize = 2 * VC_THRESHOLD; + /// The statement distribution subsystem. pub struct StatementDistribution; @@ -63,55 +82,171 @@ fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(n)) } +/// Tracks our impression of a single peer's view of the candidates a validator has seconded +/// for a given relay-parent. +/// +/// It is expected to receive at most `VC_THRESHOLD` from us and be aware of at most `VC_THRESHOLD` +/// via other means. +#[derive(Default)] +struct VcPerPeerTracker { + local_observed: arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>, + remote_observed: arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>, +} + +impl VcPerPeerTracker { + fn contains(&self, h: &Hash) -> bool { + self.local_observed.contains(h) || self.remote_observed.contains(h) + } + + // Note that the remote should now be aware that a validator has seconded a given candidate (by hash) + // based on a message that we have sent it from our local pool. + fn note_local(&mut self, h: Hash) { + if self.local_observed.contains(&h) { return } + + if self.local_observed.is_full() { + log::warn!("Statement distribution is erroneously attempting to distribute more \ + than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD); + } else { + self.local_observed.try_push(h).expect("length of storage guarded above; \ + only panics if length exceeds capacity; qed"); + } + } + + // Note that the remote should now be aware that a validator has seconded a given candidate (by hash) + // based on a message that it has sent us. + // + // Returns `true` if the peer was allowed to send us such a message, `false` otherwise. + fn note_remote(&mut self, h: Hash) -> bool { + if self.remote_observed.contains(&h) { return true; } + + if self.remote_observed.is_full() { + return false; + } else { + self.remote_observed.try_push(h).expect("length of storage guarded above; \ + only panics if length exceeds capacity; qed"); + + true + } + } +} + // knowledge that a peer has about goings-on in a relay parent. struct PeerRelayParentKnowledge { - // candidates that a peer is aware of. This indicates that we can send - // statements pertaining to that candidate. + // candidates that the peer is aware of. This indicates that we can + // send other statements pertaining to that candidate. known_candidates: HashSet, // fingerprints of all statements a peer should be aware of: those that - // were sent to us by the peer or sent to the peer by us. - known_statements: HashSet<(CompactStatement, ValidatorIndex)>, + // were sent to the peer by us. + sent_statements: HashSet<(CompactStatement, ValidatorIndex)>, + // fingerprints of all statements a peer should be aware of: those that + // were sent to us by the peer. + received_statements: HashSet<(CompactStatement, ValidatorIndex)>, // How many candidates this peer is aware of for each given validator index. - seconded_counts: HashMap, + seconded_counts: HashMap, + // How many statements we've received for each candidate that we're aware of. + received_message_count: HashMap, } impl PeerRelayParentKnowledge { - /// Attempt to update our view of the peer's knowledge with this statement's fingerprint. + /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based + /// on something that we would like to send to the peer. /// /// This returns `false` if the peer cannot accept this statement, without altering internal /// state. /// /// If the peer can accept the statement, this returns `true` and updates the internal state. /// Once the knowledge has incorporated a statement, it cannot be incorporated again. - fn accept(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool { - if self.known_statements.contains(fingerprint) { + fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool { + let already_known = self.sent_statements.contains(fingerprint) + || self.received_statements.contains(fingerprint); + + if already_known { return false; } match fingerprint.0 { CompactStatement::Candidate(ref h) => { - // Each peer is allowed to be aware of two seconded statements per validator per - // relay-parent hash. - let count = self.seconded_counts.entry(fingerprint.1).or_insert(0); - if *count >= 2 { - return false; - } - *count += 1; + self.seconded_counts.entry(fingerprint.1) + .or_insert_with(Default::default) + .note_local(h.clone()); self.known_candidates.insert(h.clone()); }, CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => { // The peer can only accept Valid and Invalid statements for which it is aware // of the corresponding candidate. - if self.known_candidates.contains(h) { + if !self.known_candidates.contains(h) { return false; } } } - self.known_statements.insert(fingerprint.clone()); + self.sent_statements.insert(fingerprint.clone()); true } + + /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on + /// a message we are receiving from the peer. + /// + /// Provide the maximum message count that we can receive per candidate. In practice we should + /// not receive more statements for any one candidate than there are members in the group assigned + /// to that para, but this maximum needs to be lenient to account for equivocations that may be + /// cross-group. As such, a maximum of 2 * n_validators is recommended. + /// + /// This returns an error if the peer should not have sent us this message according to protocol + /// rules for flood protection. + /// + /// If this returns `Ok(())`, the internal state has been altered. After `receive`ing a new + /// candidate, we are then cleared to send the peer further statements about that candidate. + fn receive( + &mut self, + fingerprint: &(CompactStatement, ValidatorIndex), + max_message_count: usize, + ) -> Result<(), Rep> { + // We don't check `sent_statements` because a statement could be in-flight from both + // sides at the same time. + if self.received_statements.contains(fingerprint) { + return Err(COST_DUPLICATE_STATEMENT); + } + + let candidate_hash = match fingerprint.0 { + CompactStatement::Candidate(ref h) => { + let allowed_remote = self.seconded_counts.entry(fingerprint.1) + .or_insert_with(Default::default) + .note_remote(h.clone()); + + if !allowed_remote { + return Err(COST_UNEXPECTED_STATEMENT); + } + + h + } + CompactStatement::Valid(ref h)| CompactStatement::Invalid(ref h) => { + if !self.known_candidates.contains(&h) { + return Err(COST_UNEXPECTED_STATEMENT); + } + + h + } + }; + + { + let received_per_candidate = self.received_message_count + .entry(candidate_hash.clone()) + .or_insert(0); + + if *received_per_candidate + 1 >= max_message_count { + return Err(COST_APPARENT_FLOOD); + } + + *received_per_candidate += 1; + } + + self.received_statements.insert(fingerprint.clone()); + self.known_candidates.insert(candidate_hash.clone()); + + Ok(()) + } } struct PeerData { @@ -121,19 +256,43 @@ struct PeerData { } impl PeerData { - /// Attempt to update our view of the peer's knowledge with this statement's fingerprint. + /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based + /// on something that we would like to send to the peer. /// /// This returns `false` if the peer cannot accept this statement, without altering internal /// state. /// /// If the peer can accept the statement, this returns `true` and updates the internal state. /// Once the knowledge has incorporated a statement, it cannot be incorporated again. - fn accept( + fn send( &mut self, relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), ) -> bool { - self.view_knowledge.get_mut(relay_parent).map_or(false, |k| k.accept(fingerprint)) + self.view_knowledge.get_mut(relay_parent).map_or(false, |k| k.send(fingerprint)) + } + + /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on + /// a message we are receiving from the peer. + /// + /// Provide the maximum message count that we can receive per candidate. In practice we should + /// not receive more statements for any one candidate than there are members in the group assigned + /// to that para, but this maximum needs to be lenient to account for equivocations that may be + /// cross-group. As such, a maximum of 2 * n_validators is recommended. + /// + /// This returns an error if the peer should not have sent us this message according to protocol + /// rules for flood protection. + /// + /// If this returns `Ok(())`, the internal state has been altered. After `receive`ing a new + /// candidate, we are then cleared to send the peer further statements about that candidate. + fn receive( + &mut self, + relay_parent: &Hash, + fingerprint: &(CompactStatement, ValidatorIndex), + max_message_count: usize, + ) -> Result<(), Rep> { + self.view_knowledge.get_mut(relay_parent).ok_or(COST_UNEXPECTED_STATEMENT)? + .receive(fingerprint, max_message_count) } } @@ -176,6 +335,8 @@ struct ActiveHeadData { validators: Vec, /// The session index this head is at. session_index: sp_staking::SessionIndex, + /// How many `Seconded` statements we've seen per validator. + seconded_counts: HashMap, } impl ActiveHeadData { @@ -186,6 +347,7 @@ impl ActiveHeadData { other_statements: Default::default(), validators, session_index, + seconded_counts: Default::default(), } } @@ -194,12 +356,15 @@ impl ActiveHeadData { /// If it was not already known and can be accepted, returns `Some`, /// with a handle to the statement. /// - /// `Seconded` statements are always accepted, and are assumed to have passed flood-mitigation - /// measures before reaching this point. + /// We accept up to `VC_THRESHOLD` (2 at time of writing) `Seconded` statements + /// per validator. These will be the first ones we see. The statement is assumed + /// to have been checked, including that the validator index is not out-of-bounds and + /// the signature is valid. /// - /// Other statements that reference a candidate we are not aware of cannot be accepted. + /// Any other statements or those that reference a candidate we are not aware of cannot be accepted. fn note_statement(&mut self, statement: SignedFullStatement) -> Option<&StoredStatement> { let compact = statement.payload().to_compact(); + let validator_index = statement.validator_index(); let stored = StoredStatement { compact: compact.clone(), statement, @@ -207,6 +372,13 @@ impl ActiveHeadData { match compact { CompactStatement::Candidate(h) => { + let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0); + if *seconded_so_far >= 2 { + return None; + } else { + *seconded_so_far += 1; + } + self.candidates.insert(h); if self.seconded_statements.insert(stored) { // This will always return `Some` because it was just inserted. @@ -282,7 +454,10 @@ async fn send_stored_to_peers( ) -> SubsystemResult<()> { let fingerprint = stored.fingerprint(); let peers_to_send: Vec<_> = peers.iter_mut() - .filter_map(|(p, data)| if data.accept(&relay_parent, &fingerprint) { + .filter_map(|(p, data)| if data.send(&relay_parent, &fingerprint) { + // TODO [now]: if this message indicates a fresh candidate, send + // unlocked messages to the peer. + Some(p.clone()) } else { None @@ -291,6 +466,7 @@ async fn send_stored_to_peers( if peers_to_send.is_empty() { return Ok(()) } + let payload = WireMessage::Statement(relay_parent, stored.statement.clone()).encode(); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( peers_to_send, @@ -329,11 +505,11 @@ async fn handle_incoming_message<'a>( Ok(WireMessage::Statement(r, s)) => (r, s), }; - if !our_view.contains(relay_parent) { + if !our_view.contains(&relay_parent) { return report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await.map(|_| None); } - let active_head = match active_heads.get_mut(relay_parent) { + let active_head = match active_heads.get_mut(&relay_parent) { Some(h) => h, None => { // This should never be out-of-sync with our view if the view updates @@ -353,20 +529,18 @@ async fn handle_incoming_message<'a>( // Note that if the peer is sending us something that is not within their view, // it will not be kept within their log. let fingerprint = (statement.payload().to_compact(), statement.validator_index()); - if !peer_data.accept(&relay_parent, fingerprint) { - // If the peer was already aware of this statement or it was not within their own view, - // then we note the peer as being costly. - // - // This can race if we send the message to the peer at exactly the same time, - // but the report cost is relatively low, and the expected amounts of reports due to - // that race is also low. - // - // Regardless, this serves as a deterrent for peers to avoid spamming us with the same - // message over and over again, as we will disconnect from such peers. - report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await?; + let max_message_count = active_head.validators.len() * 2; + match peer_data.receive(&relay_parent, &fingerprint, max_message_count) { + Err(e) => { + report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await?; + return Ok(None) + } + Ok(()) => { + // TODO [now]: trigger send of new messages if this is a `Candidate` message. + } } - // Note: `peer_data.accept` already ensures that the statement is not an unbounded equivocation + // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation // or unpinned to a seconded candidate. So it is safe to place it into the storage. Ok(active_head.note_statement(statement).map(|s| (relay_parent, s))) } @@ -394,25 +568,37 @@ async fn handle_network_update( } NetworkBridgeEvent::PeerMessage(peer, message) => { match peers.get_mut(&peer) { - Some(data) => handle_incoming_message( - peer, - data, - &*our_view, - active_heads, - ctx, - message, - ).await?, + Some(data) => { + let new_stored = handle_incoming_message( + peer, + data, + &*our_view, + active_heads, + ctx, + message, + ).await?; + + if let Some(new) = new_stored { + // TODO [now]: send to `CandidateBacking` subsystem. + } + + Ok(()) + } None => Ok(()), } } NetworkBridgeEvent::PeerViewChange(peer, view) => { + // TODO [now] // 1. Update the view. // 2. Send this peer all messages that we have for new active heads in the view. + Ok(()) } NetworkBridgeEvent::OurViewChange(view) => { + // TODO [now] // 1. Update our view. // 2. Clean up everything that is not in the new view. + Ok(()) } } From cac9adcec0f28fffbef8fcf412df62af7cd1827f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 30 Jun 2020 23:56:52 -0400 Subject: [PATCH 08/26] have peer knowledge tracker follow when peer first learns about a candidate --- .../network/statement-distribution/src/lib.rs | 73 ++++++++++++------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 0c7893460b53..18de1d217c26 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -151,38 +151,48 @@ impl PeerRelayParentKnowledge { /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based /// on something that we would like to send to the peer. /// - /// This returns `false` if the peer cannot accept this statement, without altering internal + /// This returns `None` if the peer cannot accept this statement, without altering internal /// state. /// - /// If the peer can accept the statement, this returns `true` and updates the internal state. + /// If the peer can accept the statement, this returns `Some` and updates the internal state. /// Once the knowledge has incorporated a statement, it cannot be incorporated again. - fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool { + /// + /// This returns `Some(Some(hash))` if this is the first time the peer has become aware of a + /// candidate with a given hash. + fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option> { let already_known = self.sent_statements.contains(fingerprint) || self.received_statements.contains(fingerprint); if already_known { - return false; + return None; } - match fingerprint.0 { + let (candidate_hash, new_known) = match fingerprint.0 { CompactStatement::Candidate(ref h) => { self.seconded_counts.entry(fingerprint.1) .or_insert_with(Default::default) .note_local(h.clone()); - self.known_candidates.insert(h.clone()); + (h.clone(), self.known_candidates.insert(h.clone())) }, CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => { // The peer can only accept Valid and Invalid statements for which it is aware // of the corresponding candidate. if !self.known_candidates.contains(h) { - return false; + return None; } + + (h.clone(), false) } - } + }; self.sent_statements.insert(fingerprint.clone()); - true + + if new_known { + Some(Some(candidate_hash.clone())) + } else { + Some(None) + } } /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on @@ -196,13 +206,16 @@ impl PeerRelayParentKnowledge { /// This returns an error if the peer should not have sent us this message according to protocol /// rules for flood protection. /// - /// If this returns `Ok(())`, the internal state has been altered. After `receive`ing a new + /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new /// candidate, we are then cleared to send the peer further statements about that candidate. + /// + /// This returns `Ok(Some(hash))` if this is the first time the peer has become aware of a + /// candidate with given hash. fn receive( &mut self, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, - ) -> Result<(), Rep> { + ) -> Result, Rep> { // We don't check `sent_statements` because a statement could be in-flight from both // sides at the same time. if self.received_statements.contains(fingerprint) { @@ -243,9 +256,11 @@ impl PeerRelayParentKnowledge { } self.received_statements.insert(fingerprint.clone()); - self.known_candidates.insert(candidate_hash.clone()); - - Ok(()) + if self.known_candidates.insert(candidate_hash.clone()) { + Ok(Some(candidate_hash.clone())) + } else { + Ok(None) + } } } @@ -259,17 +274,20 @@ impl PeerData { /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based /// on something that we would like to send to the peer. /// - /// This returns `false` if the peer cannot accept this statement, without altering internal + /// This returns `None` if the peer cannot accept this statement, without altering internal /// state. /// - /// If the peer can accept the statement, this returns `true` and updates the internal state. + /// If the peer can accept the statement, this returns `Some` and updates the internal state. /// Once the knowledge has incorporated a statement, it cannot be incorporated again. + /// + /// This returns `Some(Some(hash))` if this is the first time the peer has become aware of a + /// candidate with a given hash. fn send( &mut self, relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), - ) -> bool { - self.view_knowledge.get_mut(relay_parent).map_or(false, |k| k.send(fingerprint)) + ) -> Option> { + self.view_knowledge.get_mut(relay_parent).map_or(None, |k| k.send(fingerprint)) } /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on @@ -283,14 +301,17 @@ impl PeerData { /// This returns an error if the peer should not have sent us this message according to protocol /// rules for flood protection. /// - /// If this returns `Ok(())`, the internal state has been altered. After `receive`ing a new + /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new /// candidate, we are then cleared to send the peer further statements about that candidate. + /// + /// This returns `Ok(Some(hash))` if this is the first time the peer has become aware of a + /// candidate with given hash. fn receive( &mut self, relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, - ) -> Result<(), Rep> { + ) -> Result, Rep> { self.view_knowledge.get_mut(relay_parent).ok_or(COST_UNEXPECTED_STATEMENT)? .receive(fingerprint, max_message_count) } @@ -453,15 +474,14 @@ async fn send_stored_to_peers( stored: &StoredStatement, ) -> SubsystemResult<()> { let fingerprint = stored.fingerprint(); + let peers_to_send: Vec<_> = peers.iter_mut() - .filter_map(|(p, data)| if data.send(&relay_parent, &fingerprint) { + .filter_map(|(p, data)| data.send(&relay_parent, &fingerprint).map(|fresh| { // TODO [now]: if this message indicates a fresh candidate, send // unlocked messages to the peer. - Some(p.clone()) - } else { - None - }) + p.clone() + })) .collect(); if peers_to_send.is_empty() { return Ok(()) } @@ -535,9 +555,10 @@ async fn handle_incoming_message<'a>( report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await?; return Ok(None) } - Ok(()) => { + Ok(Some(hash)) => { // TODO [now]: trigger send of new messages if this is a `Candidate` message. } + Ok(None) => {} } // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation From 1be7e37a33a8027fb062596689ee2df2056acb16 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 00:42:01 -0400 Subject: [PATCH 09/26] send dependents after circulating --- .../network/statement-distribution/src/lib.rs | 172 ++++++++++++------ primitives/src/parachain.rs | 12 ++ 2 files changed, 130 insertions(+), 54 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 18de1d217c26..363a55b284db 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -157,9 +157,9 @@ impl PeerRelayParentKnowledge { /// If the peer can accept the statement, this returns `Some` and updates the internal state. /// Once the knowledge has incorporated a statement, it cannot be incorporated again. /// - /// This returns `Some(Some(hash))` if this is the first time the peer has become aware of a - /// candidate with a given hash. - fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option> { + /// This returns `Some(true)` if this is the first time the peer has become aware of a + /// candidate with the given hash. + fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option { let already_known = self.sent_statements.contains(fingerprint) || self.received_statements.contains(fingerprint); @@ -167,13 +167,13 @@ impl PeerRelayParentKnowledge { return None; } - let (candidate_hash, new_known) = match fingerprint.0 { + let new_known = match fingerprint.0 { CompactStatement::Candidate(ref h) => { self.seconded_counts.entry(fingerprint.1) .or_insert_with(Default::default) .note_local(h.clone()); - (h.clone(), self.known_candidates.insert(h.clone())) + self.known_candidates.insert(h.clone()) }, CompactStatement::Valid(ref h) | CompactStatement::Invalid(ref h) => { // The peer can only accept Valid and Invalid statements for which it is aware @@ -182,17 +182,13 @@ impl PeerRelayParentKnowledge { return None; } - (h.clone(), false) + false } }; self.sent_statements.insert(fingerprint.clone()); - if new_known { - Some(Some(candidate_hash.clone())) - } else { - Some(None) - } + Some(new_known) } /// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on @@ -209,13 +205,13 @@ impl PeerRelayParentKnowledge { /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new /// candidate, we are then cleared to send the peer further statements about that candidate. /// - /// This returns `Ok(Some(hash))` if this is the first time the peer has become aware of a + /// This returns `Ok(true)` if this is the first time the peer has become aware of a /// candidate with given hash. fn receive( &mut self, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, - ) -> Result, Rep> { + ) -> Result { // We don't check `sent_statements` because a statement could be in-flight from both // sides at the same time. if self.received_statements.contains(fingerprint) { @@ -256,11 +252,7 @@ impl PeerRelayParentKnowledge { } self.received_statements.insert(fingerprint.clone()); - if self.known_candidates.insert(candidate_hash.clone()) { - Ok(Some(candidate_hash.clone())) - } else { - Ok(None) - } + Ok(self.known_candidates.insert(candidate_hash.clone())) } } @@ -280,13 +272,13 @@ impl PeerData { /// If the peer can accept the statement, this returns `Some` and updates the internal state. /// Once the knowledge has incorporated a statement, it cannot be incorporated again. /// - /// This returns `Some(Some(hash))` if this is the first time the peer has become aware of a - /// candidate with a given hash. + /// This returns `Some(true)` if this is the first time the peer has become aware of a + /// candidate with the given hash. fn send( &mut self, relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), - ) -> Option> { + ) -> Option { self.view_knowledge.get_mut(relay_parent).map_or(None, |k| k.send(fingerprint)) } @@ -304,14 +296,14 @@ impl PeerData { /// If this returns `Ok`, the internal state has been altered. After `receive`ing a new /// candidate, we are then cleared to send the peer further statements about that candidate. /// - /// This returns `Ok(Some(hash))` if this is the first time the peer has become aware of a + /// This returns `Ok(true)` if this is the first time the peer has become aware of a /// candidate with given hash. fn receive( &mut self, relay_parent: &Hash, fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, - ) -> Result, Rep> { + ) -> Result { self.view_knowledge.get_mut(relay_parent).ok_or(COST_UNEXPECTED_STATEMENT)? .receive(fingerprint, max_message_count) } @@ -427,6 +419,13 @@ impl ActiveHeadData { fn statements(&self) -> impl Iterator + '_ { self.seconded_statements.iter().chain(self.other_statements.iter()) } + + /// Get an iterator over all statements for the active head that are for a particular candidate. + fn statements_about(&self, candidate_hash: Hash) + -> impl Iterator + '_ + { + self.statements().filter(move |s| s.compact.candidate_hash() == &candidate_hash) + } } /// Check a statement signature under this parent hash. @@ -452,47 +451,111 @@ enum WireMessage { Statement(Hash, SignedFullStatement), } -async fn share_message( +/// Places the statement in storage if it is new, and then +/// circulates the statement to all peers who have not seen it yet, and +/// sends all statements dependent on that statement to peers who could previously not receive +/// them but now can. +async fn circulate_statement_and_dependents( peers: &mut HashMap, active_heads: &mut HashMap, ctx: &mut impl SubsystemContext, relay_parent: Hash, statement: SignedFullStatement, ) -> SubsystemResult<()> { - if let Some(stored) - = active_heads.get_mut(&relay_parent).and_then(|d| d.note_statement(statement)) - { - send_stored_to_peers(peers, ctx, relay_parent, stored).await?; + if let Some(active_head)= active_heads.get_mut(&relay_parent) { + + // First circulate the statement directly to all peers needing it. + // The borrow of `active_head` needs to encompass only this (Rust) statement. + let outputs: Option<(Hash, Vec)> = { + match active_head.note_statement(statement) { + Some(stored) => Some(( + stored.compact.candidate_hash().clone(), + circulate_statement(peers, ctx, relay_parent, stored).await?, + )), + None => None, + } + }; + + // Now send dependent statements to all peers needing them, if any. + if let Some((candidate_hash, peers_needing_dependents)) = outputs { + for peer in peers_needing_dependents { + if let Some(peer_data) = peers.get_mut(&peer) { + // defensive: the peer data should always be some because the iterator + // of peers is derived from the set of peers. + send_statements_about( + peer, + peer_data, + ctx, + relay_parent, + candidate_hash, + &*active_head + ).await?; + } + } + } } + Ok(()) } -async fn send_stored_to_peers( +/// Circulates a statement to all peers who have not seen it yet, and returns +/// an iterator over peers who need to have dependent statements sent. +async fn circulate_statement( peers: &mut HashMap, ctx: &mut impl SubsystemContext, relay_parent: Hash, stored: &StoredStatement, -) -> SubsystemResult<()> { +) -> SubsystemResult> { let fingerprint = stored.fingerprint(); - let peers_to_send: Vec<_> = peers.iter_mut() - .filter_map(|(p, data)| data.send(&relay_parent, &fingerprint).map(|fresh| { - // TODO [now]: if this message indicates a fresh candidate, send - // unlocked messages to the peer. + let mut peers_to_send = HashMap::new(); - p.clone() - })) - .collect(); + for (peer, data) in peers.iter_mut() { + if let Some(new_known) = data.send(&relay_parent, &fingerprint) { + peers_to_send.insert(peer.clone(), new_known); + } + } - if peers_to_send.is_empty() { return Ok(()) } + // Send all these peers the initial statement. + if !peers_to_send.is_empty() { + let payload = WireMessage::Statement(relay_parent, stored.statement.clone()).encode(); + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + peers_to_send.keys().cloned().collect(), + PROTOCOL_V1, + payload, + ))).await?; + } + Ok(peers_to_send.into_iter().filter_map(|(peer, needs_dependent)| if needs_dependent { + Some(peer) + } else { + None + }).collect()) +} - let payload = WireMessage::Statement(relay_parent, stored.statement.clone()).encode(); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( - peers_to_send, - PROTOCOL_V1, - payload, - ))).await?; +/// Send all statements about a given candidate hash to a peer. +async fn send_statements_about( + peer: PeerId, + peer_data: &mut PeerData, + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + candidate_hash: Hash, + active_head: &ActiveHeadData, +) -> SubsystemResult<()> { + for statement in active_head.statements_about(candidate_hash) { + if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { + let payload = WireMessage::Statement( + relay_parent, + statement.statement.clone(), + ).encode(); + + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + vec![peer.clone()], + PROTOCOL_V1, + payload, + ))).await?; + } + } Ok(()) } @@ -555,10 +618,10 @@ async fn handle_incoming_message<'a>( report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await?; return Ok(None) } - Ok(Some(hash)) => { + Ok(true) => { // TODO [now]: trigger send of new messages if this is a `Candidate` message. } - Ok(None) => {} + Ok(false) => {} } // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation @@ -612,7 +675,7 @@ async fn handle_network_update( NetworkBridgeEvent::PeerViewChange(peer, view) => { // TODO [now] // 1. Update the view. - // 2. Send this peer all messages that we have for new active heads in the view. + // 2. Send this peer all messages that we have for active heads in the view. Ok(()) } NetworkBridgeEvent::OurViewChange(view) => { @@ -650,13 +713,14 @@ async fn run( } FromOverseer::Signal(OverseerSignal::Conclude) => break, FromOverseer::Communication { msg } => match msg { - StatementDistributionMessage::Share(relay_parent, statement) => share_message( - &mut peers, - &mut active_heads, - &mut ctx, - relay_parent, - statement, - ).await?, + StatementDistributionMessage::Share(relay_parent, statement) => + circulate_statement_and_dependents( + &mut peers, + &mut active_heads, + &mut ctx, + relay_parent, + statement, + ).await?, StatementDistributionMessage::NetworkBridgeUpdate(event) => handle_network_update( &mut peers, &mut active_heads, diff --git a/primitives/src/parachain.rs b/primitives/src/parachain.rs index 427c64bfd48d..56bb0ea6ef47 100644 --- a/primitives/src/parachain.rs +++ b/primitives/src/parachain.rs @@ -604,6 +604,18 @@ pub enum CompactStatement { Invalid(Hash), } +impl CompactStatement { + /// Get the underlying candidate hash this references. + pub fn candidate_hash(&self) -> &Hash { + match *self { + CompactStatement::Candidate(ref h) + | CompactStatement::Valid(ref h) + | CompactStatement::Invalid(ref h) + => h + } + } +} + /// A signed compact statement, suitable to be sent to the chain. pub type SignedStatement = Signed; From 47f2df81d7d4d5d3cfdd0bf3f4e2980f807dcfbb Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 00:44:47 -0400 Subject: [PATCH 10/26] add another TODO --- node/network/statement-distribution/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 363a55b284db..df1a488cf515 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -705,6 +705,7 @@ async fn run( let message = ctx.recv().await?; match message { FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { + // TODO [now]: set these up with non-empty values. active_heads.entry(relay_parent) .or_insert(ActiveHeadData::new(unimplemented!(), unimplemented!())); } From cd30c21e1b535a16fb22fcc263b274346bfd0f91 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 00:47:23 -0400 Subject: [PATCH 11/26] trigger send in one more place --- node/network/statement-distribution/src/lib.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index df1a488cf515..14eee1efaf28 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -619,7 +619,16 @@ async fn handle_incoming_message<'a>( return Ok(None) } Ok(true) => { - // TODO [now]: trigger send of new messages if this is a `Candidate` message. + // Send the peer all statements concerning the candidate that we have, + // since it appears to have just learned about the candidate. + send_statements_about( + peer, + peer_data, + ctx, + relay_parent, + fingerprint.0.candidate_hash().clone(), + &*active_head, + ).await? } Ok(false) => {} } From a39ad108462744cdd47e439e08abbbcc22165423 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 10:45:24 -0400 Subject: [PATCH 12/26] refactors from review --- .../network/statement-distribution/src/lib.rs | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 14eee1efaf28..c64d7a9b1c76 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -101,14 +101,9 @@ impl VcPerPeerTracker { // Note that the remote should now be aware that a validator has seconded a given candidate (by hash) // based on a message that we have sent it from our local pool. fn note_local(&mut self, h: Hash) { - if self.local_observed.contains(&h) { return } - - if self.local_observed.is_full() { + if !note_hash(&mut self.local_observed, h) { log::warn!("Statement distribution is erroneously attempting to distribute more \ than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD); - } else { - self.local_observed.try_push(h).expect("length of storage guarded above; \ - only panics if length exceeds capacity; qed"); } } @@ -117,16 +112,23 @@ impl VcPerPeerTracker { // // Returns `true` if the peer was allowed to send us such a message, `false` otherwise. fn note_remote(&mut self, h: Hash) -> bool { - if self.remote_observed.contains(&h) { return true; } + note_hash(&mut self.remote_observed, h) + } +} - if self.remote_observed.is_full() { - return false; - } else { - self.remote_observed.try_push(h).expect("length of storage guarded above; \ - only panics if length exceeds capacity; qed"); +fn note_hash( + observed: &mut arrayvec::ArrayVec<[Hash; VC_THRESHOLD]>, + h: Hash, +) -> bool { + if observed.contains(&h) { return true; } - true - } + if observed.is_full() { + false + } else { + observed.try_push(h).expect("length of storage guarded above; \ + only panics if length exceeds capacity; qed"); + + true } } @@ -170,7 +172,7 @@ impl PeerRelayParentKnowledge { let new_known = match fingerprint.0 { CompactStatement::Candidate(ref h) => { self.seconded_counts.entry(fingerprint.1) - .or_insert_with(Default::default) + .or_default() .note_local(h.clone()); self.known_candidates.insert(h.clone()) From 751122b514572a2a3413f7800a9e05f8fcb941f2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 14:08:14 -0400 Subject: [PATCH 13/26] send new statements to candidate backing --- node/network/statement-distribution/src/lib.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index c64d7a9b1c76..bdb1c6645047 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -25,7 +25,7 @@ use polkadot_subsystem::{ }; use polkadot_subsystem::messages::{ AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage, - PeerId, ObservedRole, ReputationChange as Rep, + PeerId, ObservedRole, ReputationChange as Rep, CandidateBackingMessage, }; use node_primitives::{ProtocolId, View, SignedFullStatement}; use polkadot_primitives::Hash; @@ -673,8 +673,13 @@ async fn handle_network_update( message, ).await?; - if let Some(new) = new_stored { - // TODO [now]: send to `CandidateBacking` subsystem. + if let Some((relay_parent, new)) = new_stored { + // When we receive a new message from a peer, we forward it to the + // candidate backing subsystem. + let message = AllMessages::CandidateBacking( + CandidateBackingMessage::Statement(relay_parent, new.statement.clone()) + ); + ctx.send_message(message).await?; } Ok(()) From d413426f1e2d9839a03639d5f9a4a247662feaf9 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 14:15:58 -0400 Subject: [PATCH 14/26] instantiate active head data with runtime API values --- .../network/statement-distribution/src/lib.rs | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index bdb1c6645047..9757d84674d8 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -25,7 +25,8 @@ use polkadot_subsystem::{ }; use polkadot_subsystem::messages::{ AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage, - PeerId, ObservedRole, ReputationChange as Rep, CandidateBackingMessage, + PeerId, ObservedRole, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage, + RuntimeApiRequest, }; use node_primitives::{ProtocolId, View, SignedFullStatement}; use polkadot_primitives::Hash; @@ -35,6 +36,7 @@ use polkadot_primitives::parachain::{ use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; +use futures::channel::oneshot; use std::collections::{HashMap, HashSet}; @@ -721,9 +723,26 @@ async fn run( let message = ctx.recv().await?; match message { FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { - // TODO [now]: set these up with non-empty values. + let (validators, session_index) = { + let (val_tx, val_rx) = oneshot::channel(); + let (session_tx, session_rx) = oneshot::channel(); + + let val_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)), + ); + let session_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)), + ); + + ctx.send_messages( + std::iter::once(val_message).chain(std::iter::once(session_message)) + ).await?; + + (val_rx.await?, session_rx.await?.session_index) + }; + active_heads.entry(relay_parent) - .or_insert(ActiveHeadData::new(unimplemented!(), unimplemented!())); + .or_insert(ActiveHeadData::new(validators, session_index)); } FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { // do nothing - we will handle this when our view changes. From 7b82e1bdb3d950d24e701c6e032d4879fff778c0 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 14:40:42 -0400 Subject: [PATCH 15/26] track our view changes and peer view changes --- .../network/statement-distribution/src/lib.rs | 122 +++++++++++++----- 1 file changed, 93 insertions(+), 29 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 9757d84674d8..0fac04d8976a 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -20,12 +20,12 @@ //! validity amongst validators. use polkadot_subsystem::{ - Subsystem, SubsystemResult, SubsystemError, SubsystemContext, SpawnedSubsystem, + Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, FromOverseer, OverseerSignal, }; use polkadot_subsystem::messages::{ AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage, - PeerId, ObservedRole, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage, + PeerId, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest, }; use node_primitives::{ProtocolId, View, SignedFullStatement}; @@ -57,16 +57,6 @@ const BENEFIT_VALID_STATEMENT: Rep = Rep::new(25, "Peer provided a valid stateme /// Typically we will only keep 1, but when a validator equivocates we will need to track 2. const VC_THRESHOLD: usize = 2; -/// The maximum amount of candidates each peer can be aware of each validator seconding at -/// any relay-parent. Short for "Validator Candidate per Peer Threshold". -/// -/// This is 2 times the `VC_THRESHOLD` because it includes the candidates in -/// our state that we may have sent them, and the candidates that they may have received from -/// other peers in the meantime. Peers are unlikely to ever be aware of more than 2 candidates -/// except in the case of a targeted attack by a sophisticated adversary. Nevertheless, we -/// establish a finite bound on memory used in such a situation. -const VC_PEER_THRESHOLD: usize = 2 * VC_THRESHOLD; - /// The statement distribution subsystem. pub struct StatementDistribution; @@ -96,10 +86,6 @@ struct VcPerPeerTracker { } impl VcPerPeerTracker { - fn contains(&self, h: &Hash) -> bool { - self.local_observed.contains(h) || self.remote_observed.contains(h) - } - // Note that the remote should now be aware that a validator has seconded a given candidate (by hash) // based on a message that we have sent it from our local pool. fn note_local(&mut self, h: Hash) { @@ -135,6 +121,7 @@ fn note_hash( } // knowledge that a peer has about goings-on in a relay parent. +#[derive(Default)] struct PeerRelayParentKnowledge { // candidates that the peer is aware of. This indicates that we can // send other statements pertaining to that candidate. @@ -261,7 +248,6 @@ impl PeerRelayParentKnowledge { } struct PeerData { - role: ObservedRole, view: View, view_knowledge: HashMap, } @@ -564,6 +550,32 @@ async fn send_statements_about( Ok(()) } +/// Send all statements at a given relay-parent to a peer. +async fn send_statements( + peer: PeerId, + peer_data: &mut PeerData, + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + active_head: &ActiveHeadData +) -> SubsystemResult<()> { + for statement in active_head.statements() { + if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { + let payload = WireMessage::Statement( + relay_parent, + statement.statement.clone(), + ).encode(); + + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + vec![peer.clone()], + PROTOCOL_V1, + payload, + ))).await?; + } + } + + Ok(()) +} + async fn report_peer( ctx: &mut impl SubsystemContext, peer: PeerId, @@ -618,8 +630,8 @@ async fn handle_incoming_message<'a>( let fingerprint = (statement.payload().to_compact(), statement.validator_index()); let max_message_count = active_head.validators.len() * 2; match peer_data.receive(&relay_parent, &fingerprint, max_message_count) { - Err(e) => { - report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await?; + Err(rep) => { + report_peer(ctx, peer, rep).await?; return Ok(None) } Ok(true) => { @@ -639,9 +651,46 @@ async fn handle_incoming_message<'a>( // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation // or unpinned to a seconded candidate. So it is safe to place it into the storage. + // TODO [now]: reward the peer if the statement was new or something we'd be interested in. + // slightly lower reward for losing races. Ok(active_head.note_statement(statement).map(|s| (relay_parent, s))) } +/// Update a peer's view. Sends all newly unlocked statements based on the previous +async fn update_peer_view_and_send_unlocked( + peer: PeerId, + peer_data: &mut PeerData, + ctx: &mut impl SubsystemContext, + active_heads: &HashMap, + new_view: View, +) -> SubsystemResult<()> { + let old_view = std::mem::replace(&mut peer_data.view, new_view); + + // Remove entries for all relay-parents in the old view but not the new. + for removed in old_view.difference(&peer_data.view) { + let _ = peer_data.view_knowledge.remove(&removed); + } + + // Add entries for all relay-parents in the new view but not the old. + // Furthermore, send all statements we have for those relay parents. + let new_view = peer_data.view.difference(&old_view).collect::>(); + for new in new_view.iter().copied() { + peer_data.view_knowledge.insert(new, Default::default()); + + if let Some(active_head) = active_heads.get(&new) { + send_statements( + peer.clone(), + peer_data, + ctx, + new, + active_head, + ).await?; + } + } + + Ok(()) +} + async fn handle_network_update( peers: &mut HashMap, active_heads: &mut HashMap, @@ -650,9 +699,8 @@ async fn handle_network_update( update: NetworkBridgeEvent, ) -> SubsystemResult<()> { match update { - NetworkBridgeEvent::PeerConnected(peer, role) => { + NetworkBridgeEvent::PeerConnected(peer, _role) => { peers.insert(peer, PeerData { - role, view: Default::default(), view_knowledge: Default::default(), }); @@ -691,15 +739,31 @@ async fn handle_network_update( } NetworkBridgeEvent::PeerViewChange(peer, view) => { - // TODO [now] - // 1. Update the view. - // 2. Send this peer all messages that we have for active heads in the view. - Ok(()) + match peers.get_mut(&peer) { + Some(data) => { + update_peer_view_and_send_unlocked( + peer, + data, + ctx, + &*active_heads, + view, + ).await + } + None => Ok(()), + } } NetworkBridgeEvent::OurViewChange(view) => { - // TODO [now] - // 1. Update our view. - // 2. Clean up everything that is not in the new view. + let old_view = std::mem::replace(our_view, view); + active_heads.retain(|head, _| our_view.contains(head)); + + for new in our_view.difference(&old_view) { + if !active_heads.contains_key(&new) { + log::warn!(target: "statement_distribution", "Our network bridge view update \ + inconsistent with `StartWork` messages we have received from overseer. \ + Contains unknown hash {}", new); + } + } + Ok(()) } } @@ -744,7 +808,7 @@ async fn run( active_heads.entry(relay_parent) .or_insert(ActiveHeadData::new(validators, session_index)); } - FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { + FromOverseer::Signal(OverseerSignal::StopWork(_relay_parent)) => { // do nothing - we will handle this when our view changes. } FromOverseer::Signal(OverseerSignal::Conclude) => break, From 94a69fffac7325ab3a0bc60201d32f3ce3318306 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 14:46:33 -0400 Subject: [PATCH 16/26] apply a benefit to peers who send us statements we want --- .../network/statement-distribution/src/lib.rs | 46 ++++++++++++++----- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 0fac04d8976a..d13473a7aa86 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -48,7 +48,11 @@ const COST_INVALID_MESSAGE: Rep = Rep::new(-500, "Invalid message"); const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer"); const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements"); -const BENEFIT_VALID_STATEMENT: Rep = Rep::new(25, "Peer provided a valid statement"); +const BENEFIT_VALID_STATEMENT: Rep = Rep::new(5, "Peer provided a valid statement"); +const BENEFIT_VALID_STATEMENT_FIRST: Rep = Rep::new( + 25, + "Peer was the first to provide a valid statement", +); /// The maximum amount of candidates each validator is allowed to second at any relay-parent. /// Short for "Validator Candidate Threshold". @@ -327,6 +331,12 @@ impl std::hash::Hash for StoredStatement { } } +enum NotedStatement<'a> { + NotUseful, + Fresh(&'a StoredStatement), + UsefulButKnown +} + struct ActiveHeadData { /// All candidates we are aware of for this head, keyed by hash. candidates: HashSet, @@ -365,7 +375,7 @@ impl ActiveHeadData { /// the signature is valid. /// /// Any other statements or those that reference a candidate we are not aware of cannot be accepted. - fn note_statement(&mut self, statement: SignedFullStatement) -> Option<&StoredStatement> { + fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement { let compact = statement.payload().to_compact(); let validator_index = statement.validator_index(); let stored = StoredStatement { @@ -377,7 +387,7 @@ impl ActiveHeadData { CompactStatement::Candidate(h) => { let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0); if *seconded_so_far >= 2 { - return None; + return NotedStatement::NotUseful; } else { *seconded_so_far += 1; } @@ -385,21 +395,23 @@ impl ActiveHeadData { self.candidates.insert(h); if self.seconded_statements.insert(stored) { // This will always return `Some` because it was just inserted. - self.seconded_statements.get(&compact) + NotedStatement::Fresh(self.seconded_statements.get(&compact) + .expect("Statement was just inserted; qed")) } else { - None + NotedStatement::UsefulButKnown } } CompactStatement::Valid(h) | CompactStatement::Invalid(h) => { if !self.candidates.contains(&h) { - return None; + return NotedStatement::NotUseful; } if self.other_statements.insert(stored) { // This will always return `Some` because it was just inserted. - self.other_statements.get(&compact) + NotedStatement::Fresh(self.other_statements.get(&compact) + .expect("Statement was just inserted; qed")) } else { - None + NotedStatement::UsefulButKnown } } } @@ -458,11 +470,11 @@ async fn circulate_statement_and_dependents( // The borrow of `active_head` needs to encompass only this (Rust) statement. let outputs: Option<(Hash, Vec)> = { match active_head.note_statement(statement) { - Some(stored) => Some(( + NotedStatement::Fresh(stored) => Some(( stored.compact.candidate_hash().clone(), circulate_statement(peers, ctx, relay_parent, stored).await?, )), - None => None, + _ => None, } }; @@ -638,7 +650,7 @@ async fn handle_incoming_message<'a>( // Send the peer all statements concerning the candidate that we have, // since it appears to have just learned about the candidate. send_statements_about( - peer, + peer.clone(), peer_data, ctx, relay_parent, @@ -653,7 +665,17 @@ async fn handle_incoming_message<'a>( // or unpinned to a seconded candidate. So it is safe to place it into the storage. // TODO [now]: reward the peer if the statement was new or something we'd be interested in. // slightly lower reward for losing races. - Ok(active_head.note_statement(statement).map(|s| (relay_parent, s))) + match active_head.note_statement(statement) { + NotedStatement::NotUseful => Ok(None), + NotedStatement::UsefulButKnown => { + report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await?; + Ok(None) + } + NotedStatement::Fresh(statement) => { + report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await?; + Ok(Some((relay_parent, statement))) + } + } } /// Update a peer's view. Sends all newly unlocked statements based on the previous From a3b4b6a63e672b82b40072f3f8da1319a5ec9445 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 14:51:29 -0400 Subject: [PATCH 17/26] remove unneeded TODO --- node/network/statement-distribution/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index d13473a7aa86..a78a5897ea96 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -663,8 +663,6 @@ async fn handle_incoming_message<'a>( // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation // or unpinned to a seconded candidate. So it is safe to place it into the storage. - // TODO [now]: reward the peer if the statement was new or something we'd be interested in. - // slightly lower reward for losing races. match active_head.note_statement(statement) { NotedStatement::NotUseful => Ok(None), NotedStatement::UsefulButKnown => { From eb51e80a53353315c222fba17a2462e3f292973a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 14:58:20 -0400 Subject: [PATCH 18/26] add some comments and improve Hash implementation --- .../network/statement-distribution/src/lib.rs | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index a78a5897ea96..6d0a7e7c1b02 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -327,7 +327,15 @@ impl std::borrow::Borrow for StoredStatement { impl std::hash::Hash for StoredStatement { fn hash(&self, state: &mut H) { - self.fingerprint().hash(state) + // Hash + Eq is supposed to have the property that if `Hash(X) == Hash(Y)`, then `X == Y`. + // However, the candidate receipt types don't implement `Hash`. + // + // The fingerprint plus the signature data provides an assurance that this property will hold, + // not in general, but for all use-cases where the data within this struct has had the + // signature checked, even when validators equivocate by issuing two different signatures + // on the same data, although I am unsure if that is even possible with sr25519. + self.fingerprint().hash(state); + self.statement.signature().hash(state); } } @@ -853,3 +861,14 @@ async fn run( } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + // TODO [now]: active head data accepts 2 seconded messages per validator + // TODO [now]: note_local + // TODO [now]: note_remote + // TODO [now]: peer view update leads to messages being sent + // TODO [now]: circulating statement goes to all peers. +} From b9876ae3fb49030ef8a3e78023117d1aa953647a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 21:04:08 -0400 Subject: [PATCH 19/26] start tests and fix `note_statement` --- Cargo.lock | 1 + .../network/statement-distribution/Cargo.toml | 1 + .../network/statement-distribution/src/lib.rs | 148 ++++++++++++++---- 3 files changed, 119 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7aea203110b..80ebc2abb7db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4800,6 +4800,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-primitives", "polkadot-subsystem-test-helpers", + "sp-keyring", "sp-runtime", "sp-staking", "streamunordered", diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index 42b26d460430..a76efab573d9 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -22,3 +22,4 @@ arrayvec = "0.5.1" parking_lot = "0.10.0" subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } assert_matches = "1.3.0" +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 6d0a7e7c1b02..d467eb6b9398 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -31,7 +31,7 @@ use polkadot_subsystem::messages::{ use node_primitives::{ProtocolId, View, SignedFullStatement}; use polkadot_primitives::Hash; use polkadot_primitives::parachain::{ - CompactStatement, ValidatorIndex, ValidatorId, SigningContext, + CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, }; use parity_scale_codec::{Encode, Decode}; @@ -304,41 +304,54 @@ impl PeerData { } // A statement stored while a relay chain head is active. -// -// These are orderable first by (Seconded, Valid, Invalid), then by the underlying hash, -// and lastly by the signing validator's index. -#[derive(PartialEq, Eq)] struct StoredStatement { - compact: CompactStatement, + comparator: StoredStatementComparator, statement: SignedFullStatement, } +// A value used for comparison of stored statements to each other. +// +// The compact version of the statement, the validator index, and the signature of the validator +// is enough to differentiate between all types of equivocations, as long as the signature is +// actually checked to be valid. The same statement with 2 signatures and 2 statements with +// different (or same) signatures wll all be correctly judged to be unequal with this comparator. +#[derive(PartialEq, Eq, Hash, Clone)] +struct StoredStatementComparator { + compact: CompactStatement, + validator_index: ValidatorIndex, + signature: ValidatorSignature, +} + impl StoredStatement { + fn compact(&self) -> &CompactStatement { + &self.comparator.compact + } + fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) { - (self.compact.clone(), self.statement.validator_index()) + (self.comparator.compact.clone(), self.statement.validator_index()) } } -impl std::borrow::Borrow for StoredStatement { - fn borrow(&self) -> &CompactStatement { - &self.compact +impl std::borrow::Borrow for StoredStatement { + fn borrow(&self) -> &StoredStatementComparator { + &self.comparator } } impl std::hash::Hash for StoredStatement { fn hash(&self, state: &mut H) { - // Hash + Eq is supposed to have the property that if `Hash(X) == Hash(Y)`, then `X == Y`. - // However, the candidate receipt types don't implement `Hash`. - // - // The fingerprint plus the signature data provides an assurance that this property will hold, - // not in general, but for all use-cases where the data within this struct has had the - // signature checked, even when validators equivocate by issuing two different signatures - // on the same data, although I am unsure if that is even possible with sr25519. - self.fingerprint().hash(state); - self.statement.signature().hash(state); + self.comparator.hash(state) } } +impl std::cmp::PartialEq for StoredStatement { + fn eq(&self, other: &Self) -> bool { + &self.comparator == &other.comparator + } +} + +impl std::cmp::Eq for StoredStatement {} + enum NotedStatement<'a> { NotUseful, Fresh(&'a StoredStatement), @@ -384,14 +397,19 @@ impl ActiveHeadData { /// /// Any other statements or those that reference a candidate we are not aware of cannot be accepted. fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement { - let compact = statement.payload().to_compact(); let validator_index = statement.validator_index(); + let comparator = StoredStatementComparator { + compact: statement.payload().to_compact(), + validator_index, + signature: statement.signature().clone(), + }; + let stored = StoredStatement { - compact: compact.clone(), + comparator: comparator.clone(), statement, }; - match compact { + match comparator.compact { CompactStatement::Candidate(h) => { let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0); if *seconded_so_far >= 2 { @@ -403,7 +421,7 @@ impl ActiveHeadData { self.candidates.insert(h); if self.seconded_statements.insert(stored) { // This will always return `Some` because it was just inserted. - NotedStatement::Fresh(self.seconded_statements.get(&compact) + NotedStatement::Fresh(self.seconded_statements.get(&comparator) .expect("Statement was just inserted; qed")) } else { NotedStatement::UsefulButKnown @@ -416,7 +434,7 @@ impl ActiveHeadData { if self.other_statements.insert(stored) { // This will always return `Some` because it was just inserted. - NotedStatement::Fresh(self.other_statements.get(&compact) + NotedStatement::Fresh(self.other_statements.get(&comparator) .expect("Statement was just inserted; qed")) } else { NotedStatement::UsefulButKnown @@ -434,7 +452,7 @@ impl ActiveHeadData { fn statements_about(&self, candidate_hash: Hash) -> impl Iterator + '_ { - self.statements().filter(move |s| s.compact.candidate_hash() == &candidate_hash) + self.statements().filter(move |s| s.compact().candidate_hash() == &candidate_hash) } } @@ -479,7 +497,7 @@ async fn circulate_statement_and_dependents( let outputs: Option<(Hash, Vec)> = { match active_head.note_statement(statement) { NotedStatement::Fresh(stored) => Some(( - stored.compact.candidate_hash().clone(), + stored.compact().candidate_hash().clone(), circulate_statement(peers, ctx, relay_parent, stored).await?, )), _ => None, @@ -865,10 +883,78 @@ async fn run( #[cfg(test)] mod tests { use super::*; + use sp_keyring::Sr25519Keyring; + use node_primitives::Statement; + use polkadot_primitives::parachain::{AbridgedCandidateReceipt}; + + #[test] + fn active_head_accepts_only_2_seconded_per_validator() { + let validators = vec![ + Sr25519Keyring::Alice.public().into(), + Sr25519Keyring::Bob.public().into(), + Sr25519Keyring::Charlie.public().into(), + ]; + let parent_hash: Hash = [1; 32].into(); + + let session_index = 1; + let signing_context = SigningContext { + parent_hash, + session_index, + }; + + let candidate_a = { + let mut c = AbridgedCandidateReceipt::default(); + c.relay_parent = parent_hash; + c.parachain_index = 1.into(); + c + }; + + let candidate_b = { + let mut c = AbridgedCandidateReceipt::default(); + c.relay_parent = parent_hash; + c.parachain_index = 2.into(); + c + }; + + let candidate_c = { + let mut c = AbridgedCandidateReceipt::default(); + c.relay_parent = parent_hash; + c.parachain_index = 3.into(); + c + }; - // TODO [now]: active head data accepts 2 seconded messages per validator - // TODO [now]: note_local - // TODO [now]: note_remote - // TODO [now]: peer view update leads to messages being sent - // TODO [now]: circulating statement goes to all peers. + let mut head_data = ActiveHeadData::new(validators, session_index); + + head_data.note_statement(SignedFullStatement::sign( + Statement::Seconded(candidate_a), + &signing_context, + 0, + &Sr25519Keyring::Alice.pair().into(), + )); + } + + #[test] + fn note_local_works() { + // TODO [now] + } + + #[test] + fn note_remote_works() { + // TODO [now] + } + + #[test] + fn peer_view_update_sends_messages() { + // TODO [now] + } + + #[test] + fn circulated_statement_goes_to_all_peers_with_view() { + // TODO [now] + } + + #[test] + fn smoke() { + // TODO [now] + } } From f605ed26f59ac6acb99756ed541d033652e81091 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 21:16:17 -0400 Subject: [PATCH 20/26] test active_head seconding logic --- .../network/statement-distribution/src/lib.rs | 62 +++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index d467eb6b9398..d0fbaf20b621 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -304,6 +304,7 @@ impl PeerData { } // A statement stored while a relay chain head is active. +#[derive(Debug)] struct StoredStatement { comparator: StoredStatementComparator, statement: SignedFullStatement, @@ -315,7 +316,7 @@ struct StoredStatement { // is enough to differentiate between all types of equivocations, as long as the signature is // actually checked to be valid. The same statement with 2 signatures and 2 statements with // different (or same) signatures wll all be correctly judged to be unequal with this comparator. -#[derive(PartialEq, Eq, Hash, Clone)] +#[derive(PartialEq, Eq, Hash, Clone, Debug)] struct StoredStatementComparator { compact: CompactStatement, validator_index: ValidatorIndex, @@ -352,6 +353,7 @@ impl std::cmp::PartialEq for StoredStatement { impl std::cmp::Eq for StoredStatement {} +#[derive(Debug)] enum NotedStatement<'a> { NotUseful, Fresh(&'a StoredStatement), @@ -414,12 +416,12 @@ impl ActiveHeadData { let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0); if *seconded_so_far >= 2 { return NotedStatement::NotUseful; - } else { - *seconded_so_far += 1; } self.candidates.insert(h); if self.seconded_statements.insert(stored) { + *seconded_so_far += 1; + // This will always return `Some` because it was just inserted. NotedStatement::Fresh(self.seconded_statements.get(&comparator) .expect("Statement was just inserted; qed")) @@ -886,6 +888,7 @@ mod tests { use sp_keyring::Sr25519Keyring; use node_primitives::Statement; use polkadot_primitives::parachain::{AbridgedCandidateReceipt}; + use assert_matches::assert_matches; #[test] fn active_head_accepts_only_2_seconded_per_validator() { @@ -925,12 +928,61 @@ mod tests { let mut head_data = ActiveHeadData::new(validators, session_index); - head_data.note_statement(SignedFullStatement::sign( - Statement::Seconded(candidate_a), + // note A + let a_seconded_val_0 = SignedFullStatement::sign( + Statement::Seconded(candidate_a.clone()), + &signing_context, + 0, + &Sr25519Keyring::Alice.pair().into(), + ); + let noted = head_data.note_statement(a_seconded_val_0.clone()); + + assert_matches!(noted, NotedStatement::Fresh(_)); + + // note A (duplicate) + let noted = head_data.note_statement(a_seconded_val_0); + + assert_matches!(noted, NotedStatement::UsefulButKnown); + + // note B + let noted = head_data.note_statement(SignedFullStatement::sign( + Statement::Seconded(candidate_b.clone()), + &signing_context, + 0, + &Sr25519Keyring::Alice.pair().into(), + )); + + assert_matches!(noted, NotedStatement::Fresh(_)); + + // note C (beyond 2 - ignored) + let noted = head_data.note_statement(SignedFullStatement::sign( + Statement::Seconded(candidate_c.clone()), &signing_context, 0, &Sr25519Keyring::Alice.pair().into(), )); + + assert_matches!(noted, NotedStatement::NotUseful); + + // note B (new validator) + let noted = head_data.note_statement(SignedFullStatement::sign( + Statement::Seconded(candidate_b.clone()), + &signing_context, + 1, + &Sr25519Keyring::Bob.pair().into(), + )); + + assert_matches!(noted, NotedStatement::Fresh(_)); + + // note C (new validator) + let noted = head_data.note_statement(SignedFullStatement::sign( + Statement::Seconded(candidate_c.clone()), + &signing_context, + 1, + &Sr25519Keyring::Bob.pair().into(), + )); + + assert_matches!(noted, NotedStatement::Fresh(_)); } #[test] From 5650620b7bad9bb050832c0946a76945d0da4e13 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 21:19:17 -0400 Subject: [PATCH 21/26] test that the per-peer tracking logic works --- .../network/statement-distribution/src/lib.rs | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index d0fbaf20b621..2bbd26326347 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -987,12 +987,38 @@ mod tests { #[test] fn note_local_works() { - // TODO [now] + let hash_a: Hash = [1; 32].into(); + let hash_b: Hash = [2; 32].into(); + + let mut per_peer_tracker = VcPerPeerTracker::default(); + per_peer_tracker.note_local(hash_a.clone()); + per_peer_tracker.note_local(hash_b.clone()); + + assert!(per_peer_tracker.local_observed.contains(&hash_a)); + assert!(per_peer_tracker.local_observed.contains(&hash_b)); + + assert!(!per_peer_tracker.remote_observed.contains(&hash_a)); + assert!(!per_peer_tracker.remote_observed.contains(&hash_b)); } #[test] fn note_remote_works() { - // TODO [now] + let hash_a: Hash = [1; 32].into(); + let hash_b: Hash = [2; 32].into(); + let hash_c: Hash = [3; 32].into(); + + let mut per_peer_tracker = VcPerPeerTracker::default(); + assert!(per_peer_tracker.note_remote(hash_a.clone())); + assert!(per_peer_tracker.note_remote(hash_b.clone())); + assert!(!per_peer_tracker.note_remote(hash_c.clone())); + + assert!(per_peer_tracker.remote_observed.contains(&hash_a)); + assert!(per_peer_tracker.remote_observed.contains(&hash_b)); + assert!(!per_peer_tracker.remote_observed.contains(&hash_c)); + + assert!(!per_peer_tracker.local_observed.contains(&hash_a)); + assert!(!per_peer_tracker.local_observed.contains(&hash_b)); + assert!(!per_peer_tracker.local_observed.contains(&hash_c)); } #[test] From dd001082b595b747cb701d0682fbb5d8c62e2086 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 21:51:22 -0400 Subject: [PATCH 22/26] test per-peer knowledge tracker --- .../network/statement-distribution/src/lib.rs | 109 +++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 2bbd26326347..87cb05a87993 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -239,7 +239,7 @@ impl PeerRelayParentKnowledge { .entry(candidate_hash.clone()) .or_insert(0); - if *received_per_candidate + 1 >= max_message_count { + if *received_per_candidate >= max_message_count { return Err(COST_APPARENT_FLOOD); } @@ -1021,6 +1021,113 @@ mod tests { assert!(!per_peer_tracker.local_observed.contains(&hash_c)); } + #[test] + fn per_peer_relay_parent_knowledge_send() { + let mut knowledge = PeerRelayParentKnowledge::default(); + + let hash_a: Hash = [1; 32].into(); + + // Sending an un-pinned statement should not work and should have no effect. + assert!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)).is_none()); + assert!(!knowledge.known_candidates.contains(&hash_a)); + assert!(knowledge.sent_statements.is_empty()); + assert!(knowledge.received_statements.is_empty()); + assert!(knowledge.seconded_counts.is_empty()); + assert!(knowledge.received_message_count.is_empty()); + + // Make the peer aware of the candidate. + assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)), Some(true)); + assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 1)), Some(false)); + assert!(knowledge.known_candidates.contains(&hash_a)); + assert_eq!(knowledge.sent_statements.len(), 2); + assert!(knowledge.received_statements.is_empty()); + assert_eq!(knowledge.seconded_counts.len(), 2); + assert!(knowledge.received_message_count.get(&hash_a).is_none()); + + // And now it should accept the dependent message. + assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)), Some(false)); + assert!(knowledge.known_candidates.contains(&hash_a)); + assert_eq!(knowledge.sent_statements.len(), 3); + assert!(knowledge.received_statements.is_empty()); + assert_eq!(knowledge.seconded_counts.len(), 2); + assert!(knowledge.received_message_count.get(&hash_a).is_none()); + } + + #[test] + fn cant_send_after_receiving() { + let mut knowledge = PeerRelayParentKnowledge::default(); + + let hash_a: Hash = [1; 32].into(); + assert!(knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3).unwrap()); + assert!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)).is_none()); + } + + #[test] + fn per_peer_relay_parent_knowledge_receive() { + let mut knowledge = PeerRelayParentKnowledge::default(); + + let hash_a: Hash = [1; 32].into(); + + assert_eq!( + knowledge.receive(&(CompactStatement::Valid(hash_a), 0), 3), + Err(COST_UNEXPECTED_STATEMENT), + ); + + assert_eq!( + knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3), + Ok(true), + ); + + // Push statements up to the flood limit. + assert_eq!( + knowledge.receive(&(CompactStatement::Valid(hash_a), 1), 3), + Ok(false), + ); + + assert!(knowledge.known_candidates.contains(&hash_a)); + assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2); + + assert_eq!( + knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3), + Ok(false), + ); + + assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3); + + assert_eq!( + knowledge.receive(&(CompactStatement::Valid(hash_a), 7), 3), + Err(COST_APPARENT_FLOOD), + ); + + assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3); + assert_eq!(knowledge.received_statements.len(), 3); // number of prior `Ok`s. + + // Now make sure that the seconding limit is respected. + let hash_b: Hash = [2; 32].into(); + let hash_c: Hash = [3; 32].into(); + + assert_eq!( + knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3), + Ok(true), + ); + + assert_eq!( + knowledge.receive(&(CompactStatement::Candidate(hash_c), 0), 3), + Err(COST_UNEXPECTED_STATEMENT), + ); + + // Last, make sure that already-known statements are disregarded. + assert_eq!( + knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3), + Err(COST_DUPLICATE_STATEMENT), + ); + + assert_eq!( + knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3), + Err(COST_DUPLICATE_STATEMENT), + ); + } + #[test] fn peer_view_update_sends_messages() { // TODO [now] From c93b545abd19dd370c44aa5fa4109a4fb2258ef4 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 22:15:16 -0400 Subject: [PATCH 23/26] test that peer view updates lead to messages being sent --- .../network/statement-distribution/src/lib.rs | 133 +++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 87cb05a87993..d3b5158e0c28 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -889,6 +889,7 @@ mod tests { use node_primitives::Statement; use polkadot_primitives::parachain::{AbridgedCandidateReceipt}; use assert_matches::assert_matches; + use futures::executor::{self, ThreadPool}; #[test] fn active_head_accepts_only_2_seconded_per_validator() { @@ -1130,7 +1131,137 @@ mod tests { #[test] fn peer_view_update_sends_messages() { - // TODO [now] + let hash_a = [1; 32].into(); + let hash_b = [2; 32].into(); + let hash_c = [3; 32].into(); + + let candidate = { + let mut c = AbridgedCandidateReceipt::default(); + c.relay_parent = hash_c; + c.parachain_index = 1.into(); + c + }; + let candidate_hash = candidate.hash(); + + let old_view = View(vec![hash_a, hash_b]); + let new_view = View(vec![hash_b, hash_c]); + + let mut active_heads = HashMap::new(); + let validators = vec![ + Sr25519Keyring::Alice.public().into(), + Sr25519Keyring::Bob.public().into(), + Sr25519Keyring::Charlie.public().into(), + ]; + + let session_index = 1; + let signing_context = SigningContext { + parent_hash: hash_c, + session_index, + }; + + let new_head_data = { + let mut data = ActiveHeadData::new(validators, session_index); + + let noted = data.note_statement(SignedFullStatement::sign( + Statement::Seconded(candidate.clone()), + &signing_context, + 0, + &Sr25519Keyring::Alice.pair().into(), + )); + + assert_matches!(noted, NotedStatement::Fresh(_)); + + let noted = data.note_statement(SignedFullStatement::sign( + Statement::Valid(candidate_hash), + &signing_context, + 1, + &Sr25519Keyring::Bob.pair().into(), + )); + + assert_matches!(noted, NotedStatement::Fresh(_)); + + let noted = data.note_statement(SignedFullStatement::sign( + Statement::Valid(candidate_hash), + &signing_context, + 2, + &Sr25519Keyring::Charlie.pair().into(), + )); + + assert_matches!(noted, NotedStatement::Fresh(_)); + + data + }; + + active_heads.insert(hash_c, new_head_data); + + let mut peer_data = PeerData { + view: old_view, + view_knowledge: { + let mut k = HashMap::new(); + + k.insert(hash_a, Default::default()); + k.insert(hash_b, Default::default()); + + k + }, + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let peer = PeerId::random(); + + executor::block_on(async move { + update_peer_view_and_send_unlocked( + peer.clone(), + &mut peer_data, + &mut ctx, + &active_heads, + new_view.clone(), + ).await.unwrap(); + + assert_eq!(peer_data.view, new_view); + assert!(!peer_data.view_knowledge.contains_key(&hash_a)); + assert!(peer_data.view_knowledge.contains_key(&hash_b)); + + let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap(); + + assert!(c_knowledge.known_candidates.contains(&candidate_hash)); + assert!(c_knowledge.sent_statements.contains( + &(CompactStatement::Candidate(candidate_hash), 0) + )); + assert!(c_knowledge.sent_statements.contains( + &(CompactStatement::Valid(candidate_hash), 1) + )); + assert!(c_knowledge.sent_statements.contains( + &(CompactStatement::Valid(candidate_hash), 2) + )); + + // now see if we got the 3 messages from the active head data. + let active_head = active_heads.get(&hash_c).unwrap(); + + // semi-fragile because hashmap iterator ordering is undefined, but in practice + // it will not change between runs of the program. + for statement in active_head.statements_about(candidate_hash) { + let message = handle.recv().await; + let expected_to = vec![peer.clone()]; + let expected_protocol = PROTOCOL_V1; + let expected_payload + = WireMessage::Statement(hash_c, statement.statement.clone()).encode(); + + assert_matches!( + message, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + to, + protocol, + payload, + )) => { + assert_eq!(to, expected_to); + assert_eq!(protocol, expected_protocol); + assert_eq!(payload, expected_payload) + } + ) + } + }); } #[test] From 19ee50560f66d50ba4fd756bfe2c09b33c2019ad Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 1 Jul 2020 22:37:01 -0400 Subject: [PATCH 24/26] test statement circulation --- .../network/statement-distribution/src/lib.rs | 110 +++++++++++++++++- 1 file changed, 105 insertions(+), 5 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index d3b5158e0c28..1c4c8482de30 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -1266,11 +1266,111 @@ mod tests { #[test] fn circulated_statement_goes_to_all_peers_with_view() { - // TODO [now] - } + let hash_a = [1; 32].into(); + let hash_b = [2; 32].into(); + let hash_c = [3; 32].into(); - #[test] - fn smoke() { - // TODO [now] + let candidate = { + let mut c = AbridgedCandidateReceipt::default(); + c.relay_parent = hash_b; + c.parachain_index = 1.into(); + c + }; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + let peer_a_view = View(vec![hash_a]); + let peer_b_view = View(vec![hash_a, hash_b]); + let peer_c_view = View(vec![hash_b, hash_c]); + + let session_index = 1; + + let peer_data_from_view = |view: View| PeerData { + view: view.clone(), + view_knowledge: view.0.iter().map(|v| (v.clone(), Default::default())).collect(), + }; + + let mut peer_data: HashMap<_, _> = vec![ + (peer_a.clone(), peer_data_from_view(peer_a_view)), + (peer_b.clone(), peer_data_from_view(peer_b_view)), + (peer_c.clone(), peer_data_from_view(peer_c_view)), + ].into_iter().collect(); + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + let statement = { + let signing_context = SigningContext { + parent_hash: hash_b, + session_index, + }; + + let statement = SignedFullStatement::sign( + Statement::Seconded(candidate), + &signing_context, + 0, + &Sr25519Keyring::Alice.pair().into(), + ); + + StoredStatement { + comparator: StoredStatementComparator { + compact: statement.payload().to_compact(), + validator_index: 0, + signature: statement.signature().clone() + }, + statement, + } + }; + + let needs_dependents = circulate_statement( + &mut peer_data, + &mut ctx, + hash_b, + &statement, + ).await.unwrap(); + + { + assert_eq!(needs_dependents.len(), 2); + assert!(needs_dependents.contains(&peer_b)); + assert!(needs_dependents.contains(&peer_c)); + } + + let fingerprint = (statement.compact().clone(), 0); + + assert!( + peer_data.get(&peer_b).unwrap() + .view_knowledge.get(&hash_b).unwrap() + .sent_statements.contains(&fingerprint), + ); + + assert!( + peer_data.get(&peer_c).unwrap() + .view_knowledge.get(&hash_b).unwrap() + .sent_statements.contains(&fingerprint), + ); + + let message = handle.recv().await; + assert_matches!( + message, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + to, + protocol, + payload, + )) => { + assert_eq!(to.len(), 2); + assert!(to.contains(&peer_b)); + assert!(to.contains(&peer_c)); + + assert_eq!(protocol, PROTOCOL_V1); + assert_eq!( + payload, + WireMessage::Statement(hash_b, statement.statement.clone()).encode(), + ); + } + ) + }); } } From 9093736344b04e55f320bdb5d91a58081ee97062 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 2 Jul 2020 11:52:33 -0400 Subject: [PATCH 25/26] address review comments --- Cargo.lock | 1 + .../network/statement-distribution/Cargo.toml | 1 + .../network/statement-distribution/src/lib.rs | 58 ++++++++++--------- 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80ebc2abb7db..d1f2138b1ee6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4793,6 +4793,7 @@ dependencies = [ "assert_matches", "futures 0.3.5", "futures-timer 3.0.2", + "indexmap", "log 0.4.8", "parity-scale-codec", "parking_lot 0.10.2", diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index a76efab573d9..2f8da8ee3d85 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -17,6 +17,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } arrayvec = "0.5.1" +indexmap = "1.4.0" [dev-dependencies] parking_lot = "0.10.0" diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 1c4c8482de30..c8163f9629e3 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -37,6 +37,7 @@ use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; use futures::channel::oneshot; +use indexmap::IndexSet; use std::collections::{HashMap, HashSet}; @@ -124,21 +125,21 @@ fn note_hash( } } -// knowledge that a peer has about goings-on in a relay parent. +/// knowledge that a peer has about goings-on in a relay parent. #[derive(Default)] struct PeerRelayParentKnowledge { - // candidates that the peer is aware of. This indicates that we can - // send other statements pertaining to that candidate. + /// candidates that the peer is aware of. This indicates that we can + /// send other statements pertaining to that candidate. known_candidates: HashSet, - // fingerprints of all statements a peer should be aware of: those that - // were sent to the peer by us. + /// fingerprints of all statements a peer should be aware of: those that + /// were sent to the peer by us. sent_statements: HashSet<(CompactStatement, ValidatorIndex)>, - // fingerprints of all statements a peer should be aware of: those that - // were sent to us by the peer. + /// fingerprints of all statements a peer should be aware of: those that + /// were sent to us by the peer. received_statements: HashSet<(CompactStatement, ValidatorIndex)>, - // How many candidates this peer is aware of for each given validator index. + /// How many candidates this peer is aware of for each given validator index. seconded_counts: HashMap, - // How many statements we've received for each candidate that we're aware of. + /// How many statements we've received for each candidate that we're aware of. received_message_count: HashMap, } @@ -363,10 +364,11 @@ enum NotedStatement<'a> { struct ActiveHeadData { /// All candidates we are aware of for this head, keyed by hash. candidates: HashSet, - /// Stored seconded statements for circulation to peers. - seconded_statements: HashSet, - /// Stored other statements for circulation to peers. - other_statements: HashSet, + /// Stored statements for circulation to peers. + /// + /// These are iterable in insertion order, and `Seconded` statements are always + /// accepted before dependent statements. + statements: IndexSet, /// The validators at this head. validators: Vec, /// The session index this head is at. @@ -379,8 +381,7 @@ impl ActiveHeadData { fn new(validators: Vec, session_index: sp_staking::SessionIndex) -> Self { ActiveHeadData { candidates: Default::default(), - seconded_statements: Default::default(), - other_statements: Default::default(), + statements: Default::default(), validators, session_index, seconded_counts: Default::default(), @@ -389,15 +390,18 @@ impl ActiveHeadData { /// Note the given statement. /// - /// If it was not already known and can be accepted, returns `Some`, + /// If it was not already known and can be accepted, returns `NotedStatement::Fresh`, /// with a handle to the statement. /// + /// If it can be accepted, but we already know it, returns `NotedStatement::UsefulButKnown`. + /// /// We accept up to `VC_THRESHOLD` (2 at time of writing) `Seconded` statements /// per validator. These will be the first ones we see. The statement is assumed /// to have been checked, including that the validator index is not out-of-bounds and /// the signature is valid. /// - /// Any other statements or those that reference a candidate we are not aware of cannot be accepted. + /// Any other statements or those that reference a candidate we are not aware of cannot be accepted + /// and will return `NotedStatement::NotUseful`. fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement { let validator_index = statement.validator_index(); let comparator = StoredStatementComparator { @@ -414,16 +418,16 @@ impl ActiveHeadData { match comparator.compact { CompactStatement::Candidate(h) => { let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0); - if *seconded_so_far >= 2 { + if *seconded_so_far >= VC_THRESHOLD { return NotedStatement::NotUseful; } self.candidates.insert(h); - if self.seconded_statements.insert(stored) { + if self.statements.insert(stored) { *seconded_so_far += 1; // This will always return `Some` because it was just inserted. - NotedStatement::Fresh(self.seconded_statements.get(&comparator) + NotedStatement::Fresh(self.statements.get(&comparator) .expect("Statement was just inserted; qed")) } else { NotedStatement::UsefulButKnown @@ -434,9 +438,9 @@ impl ActiveHeadData { return NotedStatement::NotUseful; } - if self.other_statements.insert(stored) { + if self.statements.insert(stored) { // This will always return `Some` because it was just inserted. - NotedStatement::Fresh(self.other_statements.get(&comparator) + NotedStatement::Fresh(self.statements.get(&comparator) .expect("Statement was just inserted; qed")) } else { NotedStatement::UsefulButKnown @@ -447,7 +451,7 @@ impl ActiveHeadData { /// Get an iterator over all statements for the active head. Seconded statements come first. fn statements(&self) -> impl Iterator + '_ { - self.seconded_statements.iter().chain(self.other_statements.iter()) + self.statements.iter() } /// Get an iterator over all statements for the active head that are for a particular candidate. @@ -469,15 +473,15 @@ fn check_statement_signature( parent_hash: relay_parent, }; - match head.validators.get(statement.validator_index() as usize) { - None => return Err(()), - Some(v) => statement.check_signature(&signing_context, v), - } + head.validators.get(statement.validator_index() as usize) + .ok_or(()) + .and_then(|v| statement.check_signature(&signing_context, v)) } #[derive(Encode, Decode)] enum WireMessage { /// relay-parent, full statement. + #[codec(index = "0")] Statement(Hash, SignedFullStatement), } From 6d18b622007058f0b5b2e31595d0a3c8caf77a5b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 2 Jul 2020 11:53:36 -0400 Subject: [PATCH 26/26] have view set methods return references --- node/network/statement-distribution/src/lib.rs | 4 ++-- node/primitives/src/lib.rs | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index c8163f9629e3..f3d2653266f7 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -720,12 +720,12 @@ async fn update_peer_view_and_send_unlocked( // Remove entries for all relay-parents in the old view but not the new. for removed in old_view.difference(&peer_data.view) { - let _ = peer_data.view_knowledge.remove(&removed); + let _ = peer_data.view_knowledge.remove(removed); } // Add entries for all relay-parents in the new view but not the old. // Furthermore, send all statements we have for those relay parents. - let new_view = peer_data.view.difference(&old_view).collect::>(); + let new_view = peer_data.view.difference(&old_view).copied().collect::>(); for new in new_view.iter().copied() { peer_data.view_knowledge.insert(new, Default::default()); diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index 173a9b7d920a..921f620ea9dc 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -95,13 +95,13 @@ pub struct View(pub Vec); impl View { /// Returns an iterator of the hashes present in `Self` but not in `other`. - pub fn difference<'a>(&'a self, other: &'a View) -> impl Iterator + 'a { - self.0.iter().cloned().filter(move |h| !other.contains(h)) + pub fn difference<'a>(&'a self, other: &'a View) -> impl Iterator + 'a { + self.0.iter().filter(move |h| !other.contains(h)) } /// An iterator containing hashes present in both `Self` and in `other`. - pub fn intersection<'a>(&'a self, other: &'a View) -> impl Iterator + 'a { - self.0.iter().cloned().filter(move |h| other.contains(h)) + pub fn intersection<'a>(&'a self, other: &'a View) -> impl Iterator + 'a { + self.0.iter().filter(move |h| other.contains(h)) } /// Whether the view contains a given hash.