Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Extract BeefyGossipValidator (#147)
Browse files Browse the repository at this point in the history
* Extract BeefyGossipValidator

* Apply review suggestions
  • Loading branch information
adoerr authored Apr 14, 2021
1 parent 75ea15d commit 9da8233
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 135 deletions.
3 changes: 2 additions & 1 deletion client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use beefy_primitives::BeefyApi;
mod error;
mod metrics;
mod round;
mod validator;
mod worker;

pub mod notification;
Expand Down Expand Up @@ -105,7 +106,7 @@ pub async fn start_beefy_gadget<B, P, BE, C, N, SO>(
N: GossipNetwork<B> + Clone + Send + 'static,
SO: SyncOracleT + Send + 'static,
{
let gossip_validator = Arc::new(worker::BeefyGossipValidator::new());
let gossip_validator = Arc::new(validator::BeefyGossipValidator::new());
let gossip_engine = GossipEngine::new(network, BEEFY_PROTOCOL_NAME, gossip_validator.clone(), None);

let metrics = prometheus_registry
Expand Down
143 changes: 143 additions & 0 deletions client/beefy/src/validator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

/// The maximum number of live gossip rounds allowed, i.e. we will expire messages older than this.
use std::{fmt::Debug, marker::PhantomData};

use codec::{Decode, Encode};
use log::debug;
use parking_lot::RwLock;

use sc_network::PeerId;
use sc_network_gossip::{
MessageIntent, ValidationResult as GossipValidationResult, Validator as GossipValidator,
ValidatorContext as GossipValidatorContext,
};

use sp_core::Pair;
use sp_runtime::traits::{Block, Hash, Header, NumberFor};

use beefy_primitives::{MmrRootHash, VoteMessage};

// Limit BEEFY gossip by keeping only a bound number of voting rounds alive.
const MAX_LIVE_GOSSIP_ROUNDS: usize = 5;

/// Gossip engine messages topic
pub(crate) fn topic<B: Block>() -> B::Hash
where
B: Block,
{
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
}

/// BEEFY gossip validator
///
/// Validate BEEFY gossip messages and limit the number of live BEEFY voting rounds.
///
/// Allows messages from last [`MAX_LIVE_GOSSIP_ROUNDS`] to flow, everything else gets
/// rejected/expired.
///
///All messaging is handled in a single BEEFY global topic.
pub(crate) struct BeefyGossipValidator<B, P>
where
B: Block,
{
topic: B::Hash,
live_rounds: RwLock<Vec<NumberFor<B>>>,
_pair: PhantomData<P>,
}

impl<B, P> BeefyGossipValidator<B, P>
where
B: Block,
{
pub fn new() -> BeefyGossipValidator<B, P> {
BeefyGossipValidator {
topic: topic::<B>(),
live_rounds: RwLock::new(Vec::new()),
_pair: PhantomData,
}
}

pub(crate) fn note_round(&self, round: NumberFor<B>) {
let mut live_rounds = self.live_rounds.write();

// NOTE: ideally we'd use a VecDeque here, but currently binary search is only available on
// nightly for `VecDeque`.
while live_rounds.len() > MAX_LIVE_GOSSIP_ROUNDS {
let _ = live_rounds.remove(0);
}

if let Some(idx) = live_rounds.binary_search(&round).err() {
live_rounds.insert(idx, round);
}
}

fn is_live(live_rounds: &[NumberFor<B>], round: NumberFor<B>) -> bool {
live_rounds.binary_search(&round).is_ok()
}
}

impl<B, P> GossipValidator<B> for BeefyGossipValidator<B, P>
where
B: Block,
P: Pair,
P::Public: Debug + Decode,
P::Signature: Debug + Decode,
{
fn validate(
&self,
_context: &mut dyn GossipValidatorContext<B>,
sender: &sc_network::PeerId,
mut data: &[u8],
) -> GossipValidationResult<B::Hash> {
if let Ok(msg) = VoteMessage::<MmrRootHash, NumberFor<B>, P::Public, P::Signature>::decode(&mut data) {
if P::verify(&msg.signature, &msg.commitment.encode(), &msg.id) {
return GossipValidationResult::ProcessAndKeep(self.topic);
} else {
// TODO: report peer
debug!(target: "beefy", "🥩 Bad signature on message: {:?}, from: {:?}", msg, sender);
}
}

GossipValidationResult::Discard
}

fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
let live_rounds = self.live_rounds.read();
Box::new(move |_topic, mut data| {
let message = match VoteMessage::<MmrRootHash, NumberFor<B>, P::Public, P::Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};

!BeefyGossipValidator::<B, P>::is_live(&live_rounds, message.commitment.block_number)
})
}

#[allow(clippy::type_complexity)]
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
let live_rounds = self.live_rounds.read();
Box::new(move |_who, _intent, _topic, mut data| {
let message = match VoteMessage::<MmrRootHash, NumberFor<B>, P::Public, P::Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};

BeefyGossipValidator::<B, P>::is_live(&live_rounds, message.commitment.block_number)
})
}
}
155 changes: 21 additions & 134 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,145 +25,34 @@ use codec::{Codec, Decode, Encode};
use futures::{future, FutureExt, StreamExt};
use hex::ToHex;
use log::{debug, error, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use parking_lot::Mutex;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications};
use sc_network::PeerId;
use sc_network_gossip::{
GossipEngine, MessageIntent, ValidationResult as GossipValidationResult, Validator as GossipValidator,
ValidatorContext as GossipValidatorContext,
};
use sc_network_gossip::GossipEngine;

use sp_api::BlockId;
use sp_application_crypto::{AppPublic, Public};
use sp_core::Pair;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::OpaqueDigestItemId,
traits::{Block, Hash, Header, NumberFor, Zero},
traits::{Block, Header, NumberFor, Zero},
};

use crate::{
error::{self},
metrics::Metrics,
notification, round, Client,
};
use beefy_primitives::{
BeefyApi, Commitment, ConsensusLog, MmrRootHash, SignedCommitment, ValidatorSet, BEEFY_ENGINE_ID,
BeefyApi, Commitment, ConsensusLog, MmrRootHash, SignedCommitment, ValidatorSet, VoteMessage, BEEFY_ENGINE_ID,
GENESIS_AUTHORITY_SET_ID, KEY_TYPE,
};

/// The maximum number of live gossip rounds allowed, i.e. we will expire messages older than this.
const MAX_LIVE_GOSSIP_ROUNDS: usize = 5;

/// Gossip engine messages topic
pub(crate) fn topic<B: Block>() -> B::Hash
where
B: Block,
{
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
}

/// Allows messages from last [`MAX_LIVE_GOSSIP_ROUNDS`] to flow, everything else gets
/// rejected/expired. All messaging is handled in a single global topic.
pub struct BeefyGossipValidator<B, P>
where
B: Block,
{
topic: B::Hash,
live_rounds: RwLock<Vec<NumberFor<B>>>,
_pair: PhantomData<P>,
}

impl<B, P> BeefyGossipValidator<B, P>
where
B: Block,
{
pub fn new() -> BeefyGossipValidator<B, P> {
BeefyGossipValidator {
topic: topic::<B>(),
live_rounds: RwLock::new(Vec::new()),
_pair: PhantomData,
}
}

fn note_round(&self, round: NumberFor<B>) {
let mut live_rounds = self.live_rounds.write();

// NOTE: ideally we'd use a VecDeque here, but currently binary search is only available on
// nightly for `VecDeque`.
while live_rounds.len() > MAX_LIVE_GOSSIP_ROUNDS {
let _ = live_rounds.remove(0);
}

if let Some(idx) = live_rounds.binary_search(&round).err() {
live_rounds.insert(idx, round);
}
}

fn is_live(live_rounds: &[NumberFor<B>], round: NumberFor<B>) -> bool {
live_rounds.binary_search(&round).is_ok()
}
}

impl<B, P> GossipValidator<B> for BeefyGossipValidator<B, P>
where
B: Block,
P: Pair,
P::Public: Debug + Decode,
P::Signature: Debug + Decode,
{
fn validate(
&self,
_context: &mut dyn GossipValidatorContext<B>,
sender: &sc_network::PeerId,
mut data: &[u8],
) -> GossipValidationResult<B::Hash> {
if let Ok(msg) = VoteMessage::<MmrRootHash, NumberFor<B>, P::Public, P::Signature>::decode(&mut data) {
if P::verify(&msg.signature, &msg.commitment.encode(), &msg.id) {
return GossipValidationResult::ProcessAndKeep(self.topic);
} else {
// TODO: report peer
debug!(target: "beefy", "🥩 Bad signature on message: {:?}, from: {:?}", msg, sender);
}
}

GossipValidationResult::Discard
}

fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
let live_rounds = self.live_rounds.read();
Box::new(move |_topic, mut data| {
let message = match VoteMessage::<MmrRootHash, NumberFor<B>, P::Public, P::Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};

!BeefyGossipValidator::<B, P>::is_live(&live_rounds, message.commitment.block_number)
})
}

#[allow(clippy::type_complexity)]
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
let live_rounds = self.live_rounds.read();
Box::new(move |_who, _intent, _topic, mut data| {
let message = match VoteMessage::<MmrRootHash, NumberFor<B>, P::Public, P::Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};

BeefyGossipValidator::<B, P>::is_live(&live_rounds, message.commitment.block_number)
})
}
}

#[derive(Debug, Decode, Encode)]
struct VoteMessage<Hash, Number, Id, Signature> {
commitment: Commitment<Number, Hash>,
id: Id,
signature: Signature,
}
use crate::{
error::{self},
metrics::Metrics,
notification, round,
validator::{topic, BeefyGossipValidator},
Client,
};

/// A BEEFY worker plays the BEEFY protocol
pub(crate) struct BeefyWorker<B, C, BE, P>
where
B: Block,
Expand Down Expand Up @@ -202,15 +91,12 @@ where
C: Client<B, BE, P>,
C::Api: BeefyApi<B, P::Public>,
{
/// Retrun a new BEEFY worker instance.
/// Return a new BEEFY worker instance.
///
/// Note that full BEEFY worker initialization can only be completed, if an
/// on-chain BEEFY pallet is available. Reason is that the current active
/// validator set has to be fetched from the on-chain BEFFY pallet.
/// Note that a BEEFY worker is only fully functional if a corresponding
/// BEEFY pallet has been deployed on-chain.
///
/// For this reason, BEEFY worker initialization completes only after a finality
/// notification has been received. Such a notifcation is basically an indication
/// that an on-chain BEEFY pallet may be available.
/// The BEEFY pallet is needed in order to keep track of the BEEFY authority set.
pub(crate) fn new(
client: Arc<C>,
key_store: SyncCryptoStorePtr,
Expand Down Expand Up @@ -248,7 +134,7 @@ where
C: Client<B, BE, P>,
C::Api: BeefyApi<B, P::Public>,
{
/// Return `true`, if the should vote on block `number`
/// Return `true`, if we should vote on block `number`
fn should_vote_on(&self, number: NumberFor<B>) -> bool {
use sp_runtime::{traits::Saturating, SaturatedConversion};

Expand Down Expand Up @@ -292,9 +178,10 @@ where
/// Return the current active validator set at header `header`.
///
/// Note that the validator set could be `None`. This is the case if we don't find
/// a BEEFY authority set change and we can't fetch the validator set from the
/// BEEFY on-chain state. Such a failure is usually an indication that the BEEFT
/// pallet has not been deployed (yet).
/// a BEEFY authority set change and we can't fetch the authority set from the
/// BEEFY on-chain state.
///
/// Such a failure is usually an indication that the BEEFT pallet has not been deployed (yet).
fn validator_set(&self, header: &B::Header) -> Option<ValidatorSet<P::Public>> {
if let Some(new) = find_authorities_change::<B, P::Public>(header) {
Some(new)
Expand Down
14 changes: 14 additions & 0 deletions primitives/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ pub enum ConsensusLog<AuthorityId: Codec> {
MmrRoot(MmrRootHash),
}

/// BEEFY vote message.
///
/// A vote message is a direct vote created by a BEEFY node on every voting round
/// and is gossiped to its peers.
#[derive(Debug, Decode, Encode)]
pub struct VoteMessage<Hash, Number, Id, Signature> {
/// Commit to information extracted from a finalized block
pub commitment: Commitment<Number, Hash>,
/// Node authority id
pub id: Id,
/// Node signature
pub signature: Signature,
}

sp_api::decl_runtime_apis! {
/// API necessary for BEEFY voters.
pub trait BeefyApi<AuthorityId: Codec> {
Expand Down

0 comments on commit 9da8233

Please sign in to comment.