Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Batch vote import in dispute-distribution #5894

Merged
merged 45 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b202cc8
Start work on batching in dispute-distribution.
eskimor Aug 16, 2022
192d73b
Guide work.
eskimor Aug 17, 2022
a2f8fde
More guide changes. Still very much WIP.
eskimor Aug 18, 2022
5e2f4a5
Finish guide changes.
eskimor Aug 22, 2022
585ec6d
Clarification
eskimor Aug 22, 2022
7e02910
Adjust argument about slashing.
eskimor Aug 23, 2022
4e170e2
WIP: Add constants to receiver.
eskimor Aug 23, 2022
362ebae
Maintain order of disputes.
eskimor Aug 24, 2022
68c7073
dispute-distribuion sender Rate limit.
eskimor Aug 24, 2022
56519a7
Cleanup
eskimor Aug 25, 2022
b3ab280
WIP: dispute-distribution receiver.
eskimor Aug 25, 2022
4b4df00
WIP: Batching.
eskimor Aug 25, 2022
5816c8d
fmt
eskimor Aug 25, 2022
a821efc
Update `PeerQueues` to maintain more invariants.
eskimor Aug 26, 2022
3b71817
WIP: Batching.
eskimor Aug 26, 2022
909569b
Small cleanup
eskimor Aug 26, 2022
fa14c43
Batching logic.
eskimor Aug 30, 2022
89d622d
Some integration work.
eskimor Aug 30, 2022
401ffb8
Finish.
eskimor Aug 31, 2022
3dd2041
Typo.
eskimor Sep 1, 2022
f9fc6c8
Docs.
eskimor Sep 1, 2022
43d946d
Report missing metric.
eskimor Sep 1, 2022
3e5ceaa
Doc pass.
eskimor Sep 1, 2022
0e11f4f
Tests for waiting_queue.
eskimor Sep 2, 2022
2fcebad
Speed up some crypto by 10x.
eskimor Sep 2, 2022
3d444ae
Fix redundant import.
eskimor Sep 2, 2022
e7923a7
Add some tracing.
eskimor Sep 2, 2022
8b8f722
Better sender rate limit
eskimor Sep 2, 2022
a79a96d
Some tests.
eskimor Sep 2, 2022
c1ed615
Tests
eskimor Sep 5, 2022
f4c530c
Add logging to rate limiter
eskimor Sep 5, 2022
795c6c1
Update roadmap/implementers-guide/src/node/disputes/dispute-distribut…
eskimor Sep 8, 2022
d46be76
Update roadmap/implementers-guide/src/node/disputes/dispute-distribut…
eskimor Sep 8, 2022
9b8787d
Update node/network/dispute-distribution/src/receiver/mod.rs
eskimor Sep 8, 2022
9795098
Review feedback.
eskimor Sep 9, 2022
465f266
Also log peer in log messages.
eskimor Sep 21, 2022
88c61d2
Fix indentation.
eskimor Sep 28, 2022
78066ac
waker -> timer
eskimor Sep 28, 2022
3e43b0a
Guide improvement.
eskimor Sep 28, 2022
f0e5001
Remove obsolete comment.
eskimor Sep 28, 2022
30d10f4
waker -> timer
eskimor Oct 4, 2022
8ef4a24
Fix spell complaints.
eskimor Oct 4, 2022
5b5d82d
Merge branch 'master' into rk-batch-vote-import
eskimor Oct 4, 2022
f0a054b
Merge branch 'master' into rk-batch-vote-import
eskimor Oct 4, 2022
35d698c
Fix Cargo.lock
eskimor Oct 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ maintenance = { status = "actively-developed" }
#
# This list is ordered alphabetically.
[profile.dev.package]
blake2b_simd = { opt-level = 3 }
blake2 = { opt-level = 3 }
blake2-rfc = { opt-level = 3 }
blake2b_simd = { opt-level = 3 }
chacha20poly1305 = { opt-level = 3 }
cranelift-codegen = { opt-level = 3 }
cranelift-wasm = { opt-level = 3 }
Expand All @@ -141,8 +141,8 @@ curve25519-dalek = { opt-level = 3 }
ed25519-dalek = { opt-level = 3 }
flate2 = { opt-level = 3 }
futures-channel = { opt-level = 3 }
hashbrown = { opt-level = 3 }
hash-db = { opt-level = 3 }
hashbrown = { opt-level = 3 }
hmac = { opt-level = 3 }
httparse = { opt-level = 3 }
integer-sqrt = { opt-level = 3 }
Expand All @@ -154,8 +154,8 @@ libz-sys = { opt-level = 3 }
mio = { opt-level = 3 }
nalgebra = { opt-level = 3 }
num-bigint = { opt-level = 3 }
parking_lot_core = { opt-level = 3 }
parking_lot = { opt-level = 3 }
parking_lot_core = { opt-level = 3 }
percent-encoding = { opt-level = 3 }
primitive-types = { opt-level = 3 }
reed-solomon-novelpoly = { opt-level = 3 }
Expand All @@ -165,6 +165,7 @@ sha2 = { opt-level = 3 }
sha3 = { opt-level = 3 }
smallvec = { opt-level = 3 }
snow = { opt-level = 3 }
substrate-bip39 = {opt-level = 3}
twox-hash = { opt-level = 3 }
uint = { opt-level = 3 }
wasmi = { opt-level = 3 }
Expand Down
2 changes: 2 additions & 0 deletions node/network/dispute-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
derive_more = "0.99.17"
parity-scale-codec = { version = "3.1.5", features = ["std"] }
Expand All @@ -21,6 +22,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
thiserror = "1.0.31"
fatality = "0.0.6"
lru = "0.7.7"
indexmap = "1.9.1"

[dev-dependencies]
async-trait = "0.1.53"
Expand Down
38 changes: 32 additions & 6 deletions node/network/dispute-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
//! The sender is responsible for getting our vote out, see [`sender`]. The receiver handles
//! incoming [`DisputeRequest`]s and offers spam protection, see [`receiver`].

use std::time::Duration;

use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};

use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
Expand Down Expand Up @@ -64,16 +66,19 @@ use self::sender::{DisputeSender, TaskFinish};
/// via a dedicated channel and forwarding them to the dispute coordinator via
/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
/// nodes, the reality is not that simple of course. Before importing statements the receiver will
/// make sure as good as it can to filter out malicious/unwanted/spammy requests. For this it does
/// the following:
/// batch up imports as well as possible for efficient imports while maintaining timely dispute
/// resolution and handling of spamming validators:
///
/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
/// service.
/// - Drop messages from a node, if we are already importing a message from that node (flood).
/// - Drop messages from nodes, that provided us messages where the statement import failed.
/// - Drop messages from a node, if it sends at a too high rate.
/// - Filter out duplicate messages (over some period of time).
/// - Drop any obviously invalid votes (invalid signatures for example).
/// - Ban peers whose votes were deemed invalid.
///
/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
/// do, while at the same time making it aware of new disputes as fast as possible.
///
/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
/// This way a received confirmation guarantees, that the vote has been stored to disk by the
/// receiver.
Expand All @@ -93,6 +98,20 @@ pub use metrics::Metrics;

const LOG_TARGET: &'static str = "parachain::dispute-distribution";

/// Rate limit on the `receiver` side.
///
/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average, we
/// start dropping messages from that peer to enforce that limit.
pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100);

/// Rate limit on the `sender` side.
///
/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as
/// well.
///
/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`.
pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50));

/// The dispute distribution subsystem.
pub struct DisputeDistributionSubsystem<AD> {
/// Easy and efficient runtime access for this subsystem.
Expand Down Expand Up @@ -172,6 +191,12 @@ where
ctx.spawn("disputes-receiver", receiver.run().boxed())
.map_err(FatalError::SpawnTask)?;

// Process messages for sending side.
//
// Note: We want the sender to be rate limited and we are currently taking advantage of the
// fact that the root task of this subsystem is only concerned with sending: Functions of
// `DisputeSender` might back pressure if the rate limit is hit, which will slow down this
// loop. If this fact ever changes, we will likely need another task.
loop {
let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
match message {
Expand Down Expand Up @@ -247,9 +272,10 @@ impl MuxedMessage {
// ends.
let from_overseer = ctx.recv().fuse();
futures::pin_mut!(from_overseer, from_sender);
futures::select!(
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
// We select biased to make sure we finish up loose ends, before starting new work.
futures::select_biased!(
eskimor marked this conversation as resolved.
Show resolved Hide resolved
msg = from_sender.next() => MuxedMessage::Sender(msg),
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
)
}
}
Expand Down
7 changes: 5 additions & 2 deletions node/network/dispute-distribution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ impl Metrics {
}

/// Statements have been imported.
pub fn on_imported(&self, label: &'static str) {
pub fn on_imported(&self, label: &'static str, num_requests: usize) {
if let Some(metrics) = &self.0 {
metrics.imported_requests.with_label_values(&[label]).inc()
metrics
.imported_requests
.with_label_values(&[label])
.inc_by(num_requests as u64)
}
}

Expand Down
Loading