diff --git a/Cargo.lock b/Cargo.lock index 0fafedccb..d2decfb92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1559,12 +1559,14 @@ version = "0.1.0" dependencies = [ "ethabi", "futures", + "futures-timer", "hex", "hex-literal", "libsecp256k1 0.6.0", "log", "parity-scale-codec", "parking_lot 0.12.1", + "sc-chain-spec", "sc-client-api", "sc-keystore", "sc-network", @@ -1582,6 +1584,7 @@ dependencies = [ "sp-runtime", "strum 0.23.0", "substrate-prometheus-endpoint", + "substrate-test-runtime-client", "thiserror", ] diff --git a/client/src/service.rs b/client/src/service.rs index 7f2217f73..5d2b10a1f 100644 --- a/client/src/service.rs +++ b/client/src/service.rs @@ -325,6 +325,10 @@ pub fn new_full(mut config: Configuration, cli: &Cli) -> Result Result>( + genesis_hash: &Hash, + chain_spec: &Box, + ) -> std::borrow::Cow<'static, str> { + let chain_prefix = match chain_spec.fork_id() { + Some(fork_id) => format!("/{}/{}", hex::encode(genesis_hash), fork_id), + None => format!("/{}", hex::encode(genesis_hash)), + }; + format!("{}{}", chain_prefix, NAME).into() + } +} /// Returns the configuration value to put in /// [`sc_network::config::NetworkConfiguration::extra_sets`]. -pub fn ethy_peers_set_config() -> sc_network::config::NonDefaultSetConfig { - sc_network::config::NonDefaultSetConfig { - notifications_protocol: ETHY_PROTOCOL_NAME.into(), - max_notification_size: 1024 * 1024, - set_config: sc_network::config::SetConfig { - in_peers: 25, - out_peers: 25, - reserved_nodes: Vec::new(), - non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, - }, - fallback_names: vec![], - } +pub fn ethy_peers_set_config( + protocol_name: std::borrow::Cow<'static, str>, +) -> sc_network::config::NonDefaultSetConfig { + let mut cfg = sc_network::config::NonDefaultSetConfig::new(protocol_name, 1024 * 1024); + cfg.allow_non_reserved(25, 25); + cfg } /// A convenience ETHY client trait that defines all the type bounds a ETHY client @@ -103,7 +116,7 @@ where BE: Backend, C: Client, C::Api: EthyApi, - N: GossipNetwork + Clone + Send + 'static, + N: GossipNetwork + Clone + SyncOracle + Send + 'static, { /// ETHY client pub client: Arc, @@ -117,6 +130,8 @@ where pub event_proof_sender: notification::EthyEventProofSender, /// Prometheus metric registry pub prometheus_registry: Option, + /// Chain specific Ethy protocol name. See [`ethy_protocol_name::standard_name`]. + pub protocol_name: std::borrow::Cow<'static, str>, pub _phantom: std::marker::PhantomData, } @@ -129,7 +144,7 @@ where BE: Backend, C: Client, C::Api: EthyApi, - N: GossipNetwork + Clone + Send + 'static, + N: GossipNetwork + Clone + SyncOracle + Sync + Send + 'static, { let EthyParams { client, @@ -138,12 +153,13 @@ where network, event_proof_sender, prometheus_registry, + protocol_name, _phantom: std::marker::PhantomData, } = ethy_params; + let sync_oracle = network.clone(); let gossip_validator = Arc::new(gossip::GossipValidator::new(Default::default())); - let gossip_engine = - GossipEngine::new(network, ETHY_PROTOCOL_NAME, gossip_validator.clone(), None); + let gossip_engine = GossipEngine::new(network, protocol_name, gossip_validator.clone(), None); let metrics = prometheus_registry.as_ref().map(metrics::Metrics::register).and_then( @@ -167,9 +183,10 @@ where gossip_engine, gossip_validator, metrics, + sync_oracle, }; - let worker = worker::EthyWorker::<_, _, _>::new(worker_params); + let worker = worker::EthyWorker::<_, _, _, _>::new(worker_params); worker.run().await } diff --git a/ethy-gadget/src/worker.rs b/ethy-gadget/src/worker.rs index a5009b98e..fdb9446a9 100644 --- a/ethy-gadget/src/worker.rs +++ b/ethy-gadget/src/worker.rs @@ -14,18 +14,18 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{convert::TryInto, sync::Arc}; +use std::{sync::Arc, time::Duration}; use codec::{Codec, Decode, Encode}; -use futures::{future, FutureExt, StreamExt}; +use futures::StreamExt; use log::{debug, error, info, trace, warn}; -use parking_lot::Mutex; -use sc_client_api::{Backend, FinalityNotification, FinalityNotifications}; +use sc_client_api::{Backend, FinalityNotification}; use sc_network_gossip::GossipEngine; use sp_api::BlockId; +use sp_consensus::SyncOracle; use sp_runtime::{ generic::OpaqueDigestItemId, - traits::{Block, Header, NumberFor}, + traits::{Block, Header}, }; use seed_primitives::ethy::{ @@ -42,11 +42,7 @@ use crate::{ witness_record::WitnessRecord, Client, }; - -/// % signature to generate a proof -const PROOF_THRESHOLD: f32 = 0.6; - -pub(crate) struct WorkerParams +pub(crate) struct WorkerParams where B: Block, { @@ -57,10 +53,11 @@ where pub gossip_engine: GossipEngine, pub gossip_validator: Arc>, pub metrics: Option, + pub sync_oracle: SO, } -/// A ETHY worker plays the ETHY protocol -pub(crate) struct EthyWorker +/// An ETHY worker plays the ETHY protocol +pub(crate) struct EthyWorker where B: Block, BE: Backend, @@ -70,24 +67,26 @@ where backend: Arc, key_store: EthyKeystore, event_proof_sender: notification::EthyEventProofSender, - gossip_engine: Arc>>, + gossip_engine: GossipEngine, gossip_validator: Arc>, metrics: Option, - finality_notifications: FinalityNotifications, /// Tracks on-going witnesses witness_record: WitnessRecord, /// Best block we received a GRANDPA notification for - best_grandpa_block: NumberFor, + best_grandpa_block_header: ::Header, /// Current validator set validator_set: ValidatorSet, + /// Handle to the sync oracle + sync_oracle: SO, } -impl EthyWorker +impl EthyWorker where B: Block + Codec, BE: Backend, C: Client, C::Api: EthyApi, + SO: SyncOracle + Send + Sync + Clone + 'static, { /// Return a new ETHY worker instance. /// @@ -95,7 +94,7 @@ where /// ETHY pallet has been deployed on-chain. /// /// The ETHY pallet is needed in order to keep track of the ETHY authority set. - pub(crate) fn new(worker_params: WorkerParams) -> Self { + pub(crate) fn new(worker_params: WorkerParams) -> Self { let WorkerParams { client, backend, @@ -104,30 +103,36 @@ where gossip_engine, gossip_validator, metrics, + sync_oracle, } = worker_params; + let last_finalized_header = client + .expect_header(BlockId::number(client.info().finalized_number)) + .expect("latest block always has header available; qed."); + EthyWorker { client: client.clone(), backend, key_store, event_proof_sender, - gossip_engine: Arc::new(Mutex::new(gossip_engine)), + gossip_engine, gossip_validator, metrics, - finality_notifications: client.finality_notification_stream(), - best_grandpa_block: client.info().finalized_number, - validator_set: ValidatorSet { id: 0, validators: Default::default() }, + best_grandpa_block_header: last_finalized_header, + validator_set: ValidatorSet::empty(), witness_record: Default::default(), + sync_oracle, } } } -impl EthyWorker +impl EthyWorker where B: Block, BE: Backend, C: Client, C::Api: EthyApi, + SO: SyncOracle + Send + Sync + Clone + 'static, { /// Return the current active validator set at header `header`. /// @@ -137,11 +142,11 @@ where /// /// Such a failure is usually an indication that the ETHY pallet has not been deployed (yet). fn validator_set(&self, header: &B::Header) -> Option> { - let new = if let Some(new) = find_authorities_change::(header) { + let new = if let Some(new) = find_authorities_change::(header) { Some(new) } else { let at = BlockId::hash(header.hash()); - // queries the BEEFY pallet to get the active validator set public keys + // queries the Ethy pallet to get the active validator set public keys self.client.runtime_api().validator_set(&at).ok() }; @@ -152,7 +157,14 @@ where // For Ethy this would be a notification from something polling Ethereum full nodes fn handle_finality_notification(&mut self, notification: FinalityNotification) { - trace!(target: "ethy", "💎 finality notification for block #{:?}", ¬ification.header.number()); + debug!(target: "ethy", "💎 finality notification: {:?}", notification); + let number = *notification.header.number(); + + // On start-up ignore old finality notifications that we're not interested in. + if number <= *self.best_grandpa_block_header.number() { + debug!(target: "ethy", "💎 Got unexpected finality for old block #{:?}", number); + return + } if let Some(active) = self.validator_set(¬ification.header) { // Authority set change or genesis set id triggers new voting rounds @@ -228,8 +240,8 @@ where return }; - // Search from (self.best_grandpa_block - notification.block) to find all signing requests - // Sign and broadcast a witness + // Search from (self.best_grandpa_block_header - notification.block) to find all signing + // requests Sign and broadcast a witness for ProofRequest { message, event_id, tag, block } in extract_proof_requests::(¬ification.header, self.validator_set.id).into_iter() { @@ -260,11 +272,11 @@ where self.handle_witness(witness.clone()); // broadcast the witness - self.gossip_engine.lock().gossip_message(topic::(), broadcast_witness, false); + self.gossip_engine.gossip_message(topic::(), broadcast_witness, false); debug!(target: "ethy", "💎 gossiped witness for event: {:?}", witness.event_id); } - self.best_grandpa_block = *notification.header.number(); + self.best_grandpa_block_header = notification.header; } /// Note an individual witness for a message @@ -283,12 +295,18 @@ where return } - self.gossip_engine.lock().gossip_message(topic::(), witness.encode(), false); + self.gossip_engine.gossip_message(topic::(), witness.encode(), false); + + let proof_threshold = self.validator_set.proof_threshold as usize; + if proof_threshold < self.validator_set.validators.len() / 2 { + // safety check, < 50% doesn't make sense + error!(target: "ethy", "💎 Ethy proof threshold too low!: {:?}, validator set: {:?}", proof_threshold, self.validator_set.validators.len()); + return + } - let threshold = self.validator_set.validators.len() as f32 * PROOF_THRESHOLD; if self .witness_record - .has_consensus(witness.event_id, &witness.digest, threshold as usize) + .has_consensus(witness.event_id, &witness.digest, proof_threshold) { let signatures = self.witness_record.signatures_for(witness.event_id, &witness.digest); info!(target: "ethy", "💎 generating proof for event: {:?}, signatures: {:?}, validator set: {:?}", witness.event_id, signatures, self.validator_set.id); @@ -342,36 +360,43 @@ where } } + /// Main loop for Ethy worker. pub(crate) async fn run(mut self) { - let mut witnesses = - Box::pin(self.gossip_engine.lock().messages_for(topic::()).filter_map( - |notification| async move { - trace!(target: "ethy", "💎 got witness: {:?}", notification); + debug!(target: "Ethy", "💎 run Ethy worker, best finalized block: #{:?}.", self.best_grandpa_block_header.number()); - Witness::decode(&mut ¬ification.message[..]).ok() - }, - )); + let mut finality_notifications = self.client.finality_notification_stream().fuse(); + let mut witnesses = Box::pin(self.gossip_engine.messages_for(topic::()).filter_map( + |notification| async move { + trace!(target: "ethy", "💎 got witness: {:?}", notification); + + Witness::decode(&mut ¬ification.message[..]).ok() + }, + )) + .fuse(); loop { - let engine = self.gossip_engine.clone(); - let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); + while self.sync_oracle.is_major_syncing() { + debug!(target: "ethy", "💎 Waiting for major sync to complete..."); + futures_timer::Delay::new(Duration::from_secs(4)).await; + } + let mut gossip_engine = &mut self.gossip_engine; futures::select! { - notification = self.finality_notifications.next().fuse() => { + notification = finality_notifications.next() => { if let Some(notification) = notification { self.handle_finality_notification(notification); } else { return; } }, - witness = witnesses.next().fuse() => { + witness = witnesses.next() => { if let Some(witness) = witness { self.handle_witness(witness); } else { return; } }, - _ = gossip_engine.fuse() => { + _ = gossip_engine => { error!(target: "ethy", "💎 Gossip engine has terminated."); return; } @@ -434,16 +459,15 @@ where .collect() } -/// Scan the `header` digest log for a ETHY validator set change. Return either the new +/// Scan the `header` digest log for an Ethy validator set change. Return either the new /// validator set or `None` in case no validator set change has been signaled. -fn find_authorities_change(header: &B::Header) -> Option> +fn find_authorities_change(header: &B::Header) -> Option> where B: Block, - Id: Codec, { let id = OpaqueDigestItemId::Consensus(ÐY_ENGINE_ID); - let filter = |log: ConsensusLog| match log { + let filter = |log: ConsensusLog| match log { ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set), _ => None, }; @@ -478,9 +502,17 @@ fn eth_abi_encode_validator_set_change( } #[cfg(test)] -mod test { +pub(crate) mod test { use super::*; use sp_application_crypto::ByteArray; + use substrate_test_runtime_client::runtime::{Block, Digest, DigestItem}; + + use crate::testing::Keyring; + use seed_primitives::ethy::ValidatorSet; + + pub(crate) fn make_ethy_ids(keys: &[Keyring]) -> Vec { + keys.iter().map(|key| key.clone().public().into()).collect() + } #[test] fn encode_validator_set_change() { @@ -500,6 +532,7 @@ mod test { .unwrap(), ], id: 598, + proof_threshold: 2, }, 599, 1_234_567, @@ -509,4 +542,30 @@ mod test { "000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000000000000002560000000000000000000000000000000000000000000000000000000000000257000000000000000000000000000000000000000000000000000000000012d687000000000000000000000000000000000000000000000000000000000000000200000000000000000000000058dad74c38e9c4738bf3471f6aac6124f862faf500000000000000000000000058dad74c38e9c4738bf3471f6aac6124f862faf5" ); } + + #[test] + fn extract_authorities_change_digest() { + let mut header = Header::new( + 1u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); + + // verify empty digest shows nothing + assert!(find_authorities_change::(&header).is_none()); + + let id = 42; + let validators = make_ethy_ids(&[Keyring::Alice, Keyring::Bob]); + let validator_set = ValidatorSet { validators, id, proof_threshold: 2 }; + header.digest_mut().push(DigestItem::Consensus( + ETHY_ENGINE_ID, + ConsensusLog::::AuthoritiesChange(validator_set.clone()).encode(), + )); + + // verify validator set is correctly extracted from digest + let extracted = find_authorities_change::(&header); + assert_eq!(extracted, Some(validator_set)); + } } diff --git a/primitives/src/ethy.rs b/primitives/src/ethy.rs index dabc7c298..4a2149a2c 100644 --- a/primitives/src/ethy.rs +++ b/primitives/src/ethy.rs @@ -67,12 +67,14 @@ pub struct ValidatorSet { pub validators: Vec, /// Identifier of the validator set pub id: ValidatorSetId, + /// Minimum number of validator signatures required for a valid proof (i.e 'm' in 'm-of-n') + pub proof_threshold: u32, } impl ValidatorSet { /// Return an empty validator set with id of 0. pub fn empty() -> Self { - Self { validators: Default::default(), id: Default::default() } + Self { validators: Default::default(), id: Default::default(), proof_threshold: 0 } } } @@ -160,7 +162,7 @@ sp_api::decl_runtime_apis! { /// Runtime API for ETHY validators. pub trait EthyApi { - /// Return the active ETHY validator set (i.e eth bridge keys of active validator set) + /// Return the active ETHY validator set (i.e Ethy bridge keys of active validator set) fn validator_set() -> ValidatorSet; } }