From 87d54b2408e9946c4faebd2b72f31d2cff4c9d41 Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Thu, 31 Aug 2023 16:12:22 -0700 Subject: [PATCH] Report Peers that give bad Block Info (#1331) First implementation of https://github.com/FuelLabs/fuel-core/issues/943 During block import, if the peer gives incorrect values, dock their peer score. If they successfully report a block, improve their peer score. --- CHANGELOG.md | 19 +- Cargo.lock | 1 + crates/fuel-core/src/service/adapters.rs | 23 +- crates/fuel-core/src/service/adapters/sync.rs | 79 ++++-- crates/fuel-core/src/service/sub_services.rs | 20 +- crates/services/p2p/src/heartbeat/handler.rs | 4 +- crates/services/p2p/src/p2p_service.rs | 6 +- crates/services/p2p/src/peer_manager.rs | 4 +- .../p2p/src/request_response/messages.rs | 8 +- crates/services/p2p/src/service.rs | 6 +- crates/services/sync/Cargo.toml | 1 + crates/services/sync/src/import.rs | 158 +++++++++-- .../services/sync/src/import/test_helpers.rs | 27 +- .../test_helpers/pressure_peer_to_peer.rs | 26 +- crates/services/sync/src/import/tests.rs | 265 +++++++++++++++++- crates/services/sync/src/ports.rs | 32 ++- crates/services/sync/src/service/tests.rs | 10 +- .../types/src/services/p2p/peer_reputation.rs | 39 --- 18 files changed, 581 insertions(+), 147 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fbba851129..88b456fba32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,14 +16,7 @@ Description of the upcoming release here. - [#1286](https://github.com/FuelLabs/fuel-core/pull/1286): Include readable names for test cases where missing. - [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization. - [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions. - -#### Breaking - -- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322): - The `debug` flag is added to the CLI. The flag should be used for local development only. Enabling debug mode: - - Allows GraphQL Endpoints to arbitrarily advance blocks. - - Enables debugger GraphQL Endpoints. - - Allows setting `utxo_validation` to `false`. +- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code ### Changed @@ -39,9 +32,11 @@ Description of the upcoming release here. - [#1290](https://github.com/FuelLabs/fuel-core/pull/1290): Standardize CLI args to use `-` instead of `_`. - [#1279](https://github.com/FuelLabs/fuel-core/pull/1279): Added a new CLI flag to enable the Relayer service `--enable-relayer`, and disabled the Relayer service by default. When supplying the `--enable-relayer` flag, the `--relayer` argument becomes mandatory, and omitting it is an error. Similarly, providing a `--relayer` argument without the `--enable-relayer` flag is an error. Lastly, providing the `--keypair` or `--network` arguments will also produce an error if the `--enable-p2p` flag is not set. - [#1262](https://github.com/FuelLabs/fuel-core/pull/1262): The `ConsensusParameters` aggregates all configuration data related to the consensus. It contains many fields that are segregated by the usage. The API of some functions was affected to use lesser types instead the whole `ConsensusParameters`. It is a huge breaking change requiring repetitively monotonically updating all places that use the `ConsensusParameters`. But during updating, consider that maybe you can use lesser types. Usage of them may simplify signatures of methods and make them more user-friendly and transparent. - -### Removed - -#### Breaking - [#1322](https://github.com/FuelLabs/fuel-core/pull/1322): The `manual_blocks_enabled` flag is removed from the CLI. The analog is a `debug` flag. +- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322): + The `debug` flag is added to the CLI. The flag should be used for local development only. Enabling debug mode: + - Allows GraphQL Endpoints to arbitrarily advance blocks. + - Enables debugger GraphQL Endpoints. + - Allows setting `utxo_validation` to `false`. +### Removed diff --git a/Cargo.lock b/Cargo.lock index bcc63f2e506..f59bf26b48f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3036,6 +3036,7 @@ dependencies = [ "test-case", "tokio", "tracing", + "tracing-subscriber", ] [[package]] diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index abf801db55d..1fc252ab59f 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -5,6 +5,8 @@ use crate::{ use fuel_core_consensus_module::block_verifier::Verifier; use fuel_core_txpool::service::SharedState as TxPoolSharedState; use fuel_core_types::fuel_types::BlockHeight; +#[cfg(feature = "p2p")] +use fuel_core_types::services::p2p::peer_reputation::AppScore; use std::sync::Arc; pub mod block_importer; @@ -87,6 +89,17 @@ pub struct BlockImporterAdapter { #[derive(Clone)] pub struct P2PAdapter { service: Option, + peer_report_config: PeerReportConfig, +} + +#[cfg(feature = "p2p")] +#[derive(Clone)] +pub struct PeerReportConfig { + pub successful_block_import: AppScore, + pub missing_block_headers: AppScore, + pub bad_block_header: AppScore, + pub missing_transactions: AppScore, + pub invalid_transactions: AppScore, } #[cfg(not(feature = "p2p"))] @@ -95,8 +108,14 @@ pub struct P2PAdapter; #[cfg(feature = "p2p")] impl P2PAdapter { - pub fn new(service: Option) -> Self { - Self { service } + pub fn new( + service: Option, + peer_report_config: PeerReportConfig, + ) -> Self { + Self { + service, + peer_report_config, + } } } diff --git a/crates/fuel-core/src/service/adapters/sync.rs b/crates/fuel-core/src/service/adapters/sync.rs index 265eb199040..5482eec1191 100644 --- a/crates/fuel-core/src/service/adapters/sync.rs +++ b/crates/fuel-core/src/service/adapters/sync.rs @@ -7,6 +7,7 @@ use fuel_core_services::stream::BoxStream; use fuel_core_sync::ports::{ BlockImporterPort, ConsensusPort, + PeerReportReason, PeerToPeerPort, }; use fuel_core_types::{ @@ -21,6 +22,10 @@ use fuel_core_types::{ fuel_tx::Transaction, fuel_types::BlockHeight, services::p2p::{ + peer_reputation::{ + AppScore, + PeerReport, + }, PeerId, SourcePeer, }, @@ -46,25 +51,17 @@ impl PeerToPeerPort for P2PAdapter { async fn get_sealed_block_headers( &self, block_range_height: Range, - ) -> anyhow::Result>>> { + ) -> anyhow::Result>>> { if let Some(service) = &self.service { - Ok(service - .get_sealed_block_headers(block_range_height) - .await? - .and_then(|(peer_id, headers)| { - let peer_id: PeerId = peer_id.into(); - headers.map(|headers| { - headers - .into_iter() - .map(|header| SourcePeer { - peer_id: peer_id.clone(), - data: header, - }) - .collect() - }) - })) + let (peer_id, headers) = + service.get_sealed_block_headers(block_range_height).await?; + let sourced_headers = SourcePeer { + peer_id: peer_id.into(), + data: headers, + }; + Ok(sourced_headers) } else { - Ok(None) + Err(anyhow::anyhow!("No P2P service available")) } } @@ -81,11 +78,57 @@ impl PeerToPeerPort for P2PAdapter { .get_transactions_from_peer(peer_id.into(), block) .await } else { - Ok(None) + Err(anyhow::anyhow!("No P2P service available")) + } + } + + async fn report_peer( + &self, + peer: PeerId, + report: PeerReportReason, + ) -> anyhow::Result<()> { + if let Some(service) = &self.service { + let service_name = "Sync"; + let new_report = self.process_report(report); + service.report_peer(peer, new_report, service_name)?; + Ok(()) + } else { + Err(anyhow::anyhow!("No P2P service available")) } } } +impl P2PAdapter { + fn process_report(&self, reason: PeerReportReason) -> P2PAdapterPeerReport { + let score = match &reason { + PeerReportReason::SuccessfulBlockImport => { + self.peer_report_config.successful_block_import + } + PeerReportReason::MissingBlockHeaders => { + self.peer_report_config.missing_block_headers + } + PeerReportReason::BadBlockHeader => self.peer_report_config.bad_block_header, + PeerReportReason::MissingTransactions => { + self.peer_report_config.missing_transactions + } + PeerReportReason::InvalidTransactions => { + self.peer_report_config.invalid_transactions + } + }; + P2PAdapterPeerReport { score } + } +} + +struct P2PAdapterPeerReport { + score: AppScore, +} + +impl PeerReport for P2PAdapterPeerReport { + fn get_score_from_report(&self) -> AppScore { + self.score + } +} + #[async_trait::async_trait] impl BlockImporterPort for BlockImporterAdapter { fn committed_height_stream(&self) -> BoxStream { diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 5d5265990fa..2715cfa3336 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -109,8 +109,24 @@ pub fn init_sub_services( }; #[cfg(feature = "p2p")] - let p2p_adapter = - P2PAdapter::new(network.as_ref().map(|network| network.shared.clone())); + let p2p_adapter = { + use crate::service::adapters::PeerReportConfig; + + // Hardcoded for now, but left here to be configurable in the future. + // TODO: https://github.com/FuelLabs/fuel-core/issues/1340 + let peer_report_config = PeerReportConfig { + successful_block_import: 5., + missing_block_headers: -100., + bad_block_header: -100., + missing_transactions: -100., + invalid_transactions: -100., + }; + P2PAdapter::new( + network.as_ref().map(|network| network.shared.clone()), + peer_report_config, + ) + }; + #[cfg(not(feature = "p2p"))] let p2p_adapter = P2PAdapter::new(); diff --git a/crates/services/p2p/src/heartbeat/handler.rs b/crates/services/p2p/src/heartbeat/handler.rs index ce2c8edfeac..5cae684250d 100644 --- a/crates/services/p2p/src/heartbeat/handler.rs +++ b/crates/services/p2p/src/heartbeat/handler.rs @@ -172,8 +172,8 @@ impl ConnectionHandler for HeartbeatHandler { Self::Error, >, > { - if let Some(inbound_block_height) = self.inbound.as_mut() { - match inbound_block_height.poll_unpin(cx) { + if let Some(inbound_stream_and_block_height) = self.inbound.as_mut() { + match inbound_stream_and_block_height.poll_unpin(cx) { Poll::Ready(Err(_)) => { debug!(target: "fuel-libp2p", "Incoming heartbeat errored"); self.inbound = None; diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 072a1bce78b..499f2ed0d5e 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -613,7 +613,7 @@ impl FuelP2PService { Some(ResponseChannelItem::SealedHeaders(channel)), Ok(ResponseMessage::SealedHeaders(headers)), ) => { - if channel.send(Some((peer, headers))).is_err() { + if channel.send((peer, headers)).is_err() { debug!( "Failed to send through the channel for {:?}", request_id @@ -1191,7 +1191,7 @@ mod tests { } tracing::info!("Node B Event: {:?}", node_b_event); } - }; + } } } @@ -1595,7 +1595,7 @@ mod tests { let expected = arbitrary_headers_for_range(range.clone()); - if let Ok(Some((_, sealed_headers))) = response_message { + if let Ok((_, sealed_headers)) = response_message { let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b)); let _ = tx_test_end.send(check).await; } else { diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index e65ad104ea6..6a723e9566d 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -115,11 +115,11 @@ impl PeerManager { peer_id: &PeerId, block_height: BlockHeight, ) { - if let Some(previous_heartbeat) = self + if let Some(time_elapsed) = self .get_peer_info(peer_id) .and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat()) { - debug!(target: "fuel-p2p", "Previous hearbeat happened {:?} seconds ago", previous_heartbeat); + debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} seconds ago", time_elapsed); } let heartbeat_data = HeartbeatData::new(block_height); diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 285a3c2f53d..d466f16cfa5 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -28,8 +28,6 @@ pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1"; /// Max Size in Bytes of the Request Message pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::(); -pub type ChannelItem = oneshot::Sender>; - // Peer receives a `RequestMessage`. // It prepares a response in form of `OutboundResponse` // This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format. @@ -59,9 +57,9 @@ pub enum ResponseMessage { /// Holds oneshot channels for specific responses #[derive(Debug)] pub enum ResponseChannelItem { - Block(ChannelItem), - SealedHeaders(ChannelItem<(PeerId, Option>)>), - Transactions(ChannelItem>), + Block(oneshot::Sender>), + SealedHeaders(oneshot::Sender<(PeerId, Option>)>), + Transactions(oneshot::Sender>>), } /// Response that is sent over the wire diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 11f602fd286..136b0f7e758 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -85,7 +85,7 @@ enum TaskRequest { }, GetSealedHeaders { block_height_range: Range, - channel: oneshot::Sender>)>>, + channel: oneshot::Sender<(PeerId, Option>)>, }, GetTransactions { block_id: BlockId, @@ -384,7 +384,7 @@ impl SharedState { pub async fn get_sealed_block_headers( &self, block_height_range: Range, - ) -> anyhow::Result, Option>)>> { + ) -> anyhow::Result<(Vec, Option>)> { let (sender, receiver) = oneshot::channel(); if block_height_range.is_empty() { @@ -402,7 +402,7 @@ impl SharedState { receiver .await - .map(|o| o.map(|(peer_id, headers)| (peer_id.to_bytes(), headers))) + .map(|(peer_id, headers)| (peer_id.to_bytes(), headers)) .map_err(|e| anyhow!("{}", e)) } diff --git a/crates/services/sync/Cargo.toml b/crates/services/sync/Cargo.toml index d7cb0dc1711..13f40c3b8fd 100644 --- a/crates/services/sync/Cargo.toml +++ b/crates/services/sync/Cargo.toml @@ -24,6 +24,7 @@ fuel-core-trace = { path = "../../trace" } fuel-core-types = { path = "../../types", features = ["test-helpers"] } mockall = { workspace = true } test-case = { workspace = true } +tracing-subscriber = { workspace = true } [features] benchmarking = ["dep:mockall", "fuel-core-types/test-helpers"] diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index cd9c355a5e8..895542efa70 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -22,7 +22,10 @@ use fuel_core_types::{ SealedBlock, SealedBlockHeader, }, - services::p2p::SourcePeer, + services::p2p::{ + PeerId, + SourcePeer, + }, }; use futures::{ stream::StreamExt, @@ -36,6 +39,7 @@ use crate::{ ports::{ BlockImporterPort, ConsensusPort, + PeerReportReason, PeerToPeerPort, }, state::State, @@ -214,17 +218,28 @@ where .then({ let state = state.clone(); let executor = executor.clone(); - move |block| { + let p2p = p2p.clone(); + move |res| { + let p2p = p2p.clone(); let state = state.clone(); let executor = executor.clone(); async move { - // Short circuit on error. - let block = match block { - Ok(b) => b, - Err(e) => return Err(e), - }; - - execute_and_commit(executor.as_ref(), &state, block).await + let (peer_id, block) = res?; + + let res = execute_and_commit(executor.as_ref(), &state, block).await; + match &res { + Ok(_) => { + let _ = p2p.report_peer(peer_id.clone(), PeerReportReason::SuccessfulBlockImport) + .await + .map_err(|e| tracing::error!("Failed to report successful block import for peer {:?}: {:?}", peer_id, e)); + }, + Err(e) => { + // If this fails, then it means that consensus has approved a block that is invalid. + // This would suggest a more serious issue than a bad peer, e.g. a fork or an out-of-date client. + tracing::error!("Failed to execute and commit block from peer {:?}: {:?}", peer_id, e); + }, + } + res } } .instrument(tracing::debug_span!("execute_and_commit")) @@ -259,7 +274,8 @@ fn get_block_stream< params: &Config, p2p: Arc

, consensus: Arc, -) -> impl Stream>>> { +) -> impl Stream>>> +{ get_header_stream(range, params, p2p.clone()).map({ let p2p = p2p.clone(); let consensus_port = consensus.clone(); @@ -318,7 +334,7 @@ async fn get_sealed_blocks< result: anyhow::Result>, p2p: Arc

, consensus_port: Arc, -) -> anyhow::Result> { +) -> anyhow::Result> { let header = match result { Ok(h) => h, Err(e) => return Err(e), @@ -328,14 +344,26 @@ async fn get_sealed_blocks< data: header, } = header; let id = header.entity.id(); - let block_id = SourcePeer { peer_id, data: id }; + let block_id = SourcePeer { + peer_id: peer_id.clone(), + data: id, + }; // Check the consensus is valid on this header. if !consensus_port .check_sealed_header(&header) .trace_err("Failed to check consensus on header")? { - tracing::warn!("Header {:?} failed consensus check", header); + let _ = p2p + .report_peer(peer_id.clone(), PeerReportReason::BadBlockHeader) + .await + .map_err(|e| { + tracing::error!( + "Failed to report bad block header from peer {:?}: {:?}", + peer_id, + e + ) + }); return Ok(None) } @@ -344,7 +372,7 @@ async fn get_sealed_blocks< .await_da_height(&header.entity.da_height) .await?; - get_transactions_on_block(p2p.as_ref(), block_id, header).await + get_transactions_on_block(p2p.as_ref(), block_id, header, &peer_id).await } /// Waits for a notify or shutdown signal. @@ -381,14 +409,32 @@ async fn get_headers_batch( .await .trace_err("Failed to get headers"); let sorted_headers = match res { - Ok(None) => - vec![Err(anyhow::anyhow!("Headers provider was unable to fulfill request for unspecified reason. Possibly because requested batch size was too large"))], - Ok(Some(headers)) => headers + Ok(sourced_headers) => { + let SourcePeer { + peer_id, + data: maybe_headers, + } = sourced_headers; + let cloned_peer_id = peer_id.clone(); + let headers = match maybe_headers { + None => { + tracing::error!( + "No headers received from peer {:?} for range {} to {}", + peer_id, + start, + end + ); + vec![Err(anyhow::anyhow!("Headers provider was unable to fulfill request for unspecified reason. Possibly because requested batch size was too large"))] + } + Some(headers) => headers .into_iter() .map(move |header| { let header = range.next().and_then(|height| { - if *(header.data.entity.height()) == height.into() { - Some(header) + if *(header.entity.height()) == height.into() { + let sourced_header = SourcePeer { + peer_id: cloned_peer_id.clone(), + data: header, + }; + Some(sourced_header) } else { None } @@ -396,6 +442,29 @@ async fn get_headers_batch( Ok(header) }) .collect(), + }; + if let Some(expected_len) = end.checked_sub(start) { + if headers.len() != expected_len as usize + || headers.iter().any(|h| h.is_err()) + { + let _ = p2p + .report_peer( + peer_id.clone(), + PeerReportReason::MissingBlockHeaders, + ) + .await + .map_err(|e| { + tracing::error!( + "Failed to report missing block header from peer {:?}: {:?}", + peer_id, + e + ) + }); + } + } + headers + } + Err(e) => vec![Err(e)], }; futures::stream::iter(sorted_headers) @@ -413,7 +482,8 @@ async fn get_transactions_on_block

( p2p: &P, block_id: SourcePeer, header: SealedBlockHeader, -) -> anyhow::Result> + peer_id: &PeerId, +) -> anyhow::Result> where P: PeerToPeerPort + Send + Sync + 'static, { @@ -423,19 +493,47 @@ where } = header; // Request the transactions for this block. - Ok(p2p + let maybe_txs = p2p .get_transactions(block_id) .await .trace_err("Failed to get transactions")? - .trace_none_warn("Could not find transactions for header") - .and_then(|transactions| { - let block = Block::try_from_executed(header, transactions) - .trace_none_warn("Failed to created header from executed transactions")?; - Some(SealedBlock { - entity: block, - consensus, - }) - })) + .trace_none_warn("Could not find transactions for header"); + match maybe_txs { + None => { + let _ = p2p + .report_peer(peer_id.clone(), PeerReportReason::MissingTransactions) + .await + .map_err(|e| { + tracing::error!( + "Failed to report missing transactions from peer {:?}: {:?}", + peer_id, + e + ) + }); + Ok(None) + } + Some(transactions) => { + match Block::try_from_executed(header, transactions) { + Some(block) => Ok(Some(( + peer_id.clone(), + SealedBlock { + entity: block, + consensus, + }, + ))), + None => { + tracing::error!( + "Failed to created block from header and transactions" + ); + let _ = p2p + .report_peer(peer_id.clone(), PeerReportReason::InvalidTransactions) + .await + .map_err(|e| tracing::error!("Failed to report invalid transaction from peer {:?}: {:?}", peer_id, e)); + Ok(None) + } + } + } + } } #[tracing::instrument( diff --git a/crates/services/sync/src/import/test_helpers.rs b/crates/services/sync/src/import/test_helpers.rs index 939dbd7befb..a4efd9a6f3f 100644 --- a/crates/services/sync/src/import/test_helpers.rs +++ b/crates/services/sync/src/import/test_helpers.rs @@ -15,18 +15,21 @@ use fuel_core_types::{ SealedBlockHeader, }, fuel_types::BlockHeight, - services::p2p::SourcePeer, }; pub use counts::{ Count, SharedCounts, }; +use fuel_core_types::services::p2p::{ + PeerId, + SourcePeer, +}; pub use pressure_block_importer::PressureBlockImporter; pub use pressure_consensus::PressureConsensus; pub use pressure_peer_to_peer::PressurePeerToPeer; -pub fn empty_header(h: BlockHeight) -> SourcePeer { +pub fn empty_header(h: BlockHeight) -> SealedBlockHeader { let mut header = BlockHeader::default(); header.consensus.height = h; let transaction_tree = @@ -34,12 +37,24 @@ pub fn empty_header(h: BlockHeight) -> SourcePeer { header.application.generated.transactions_root = transaction_tree.root().into(); let consensus = Consensus::default(); - let sealed = Sealed { + Sealed { entity: header, consensus, - }; + } +} + +pub fn peer_sourced_headers( + headers: Option>, +) -> SourcePeer>> { + peer_sourced_headers_peer_id(headers, vec![].into()) +} + +pub fn peer_sourced_headers_peer_id( + headers: Option>, + peer_id: PeerId, +) -> SourcePeer>> { SourcePeer { - peer_id: vec![].into(), - data: sealed, + peer_id, + data: headers, } } diff --git a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs index b0d5b93b946..108efed4ab8 100644 --- a/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs +++ b/crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs @@ -5,6 +5,7 @@ use crate::{ }, ports::{ MockPeerToPeerPort, + PeerReportReason, PeerToPeerPort, }, }; @@ -16,7 +17,10 @@ use fuel_core_types::{ }, fuel_tx::Transaction, fuel_types::BlockHeight, - services::p2p::SourcePeer, + services::p2p::{ + PeerId, + SourcePeer, + }, }; use std::{ ops::Range, @@ -38,7 +42,7 @@ impl PeerToPeerPort for PressurePeerToPeer { async fn get_sealed_block_headers( &self, block_height_range: Range, - ) -> anyhow::Result>>> { + ) -> anyhow::Result>>> { self.counts.apply(|c| c.inc_headers()); tokio::time::sleep(self.durations[0]).await; self.counts.apply(|c| c.dec_headers()); @@ -57,19 +61,33 @@ impl PeerToPeerPort for PressurePeerToPeer { self.counts.apply(|c| c.dec_transactions()); self.p2p.get_transactions(block_id).await } + + async fn report_peer( + &self, + _peer: PeerId, + _report: PeerReportReason, + ) -> anyhow::Result<()> { + Ok(()) + } } impl PressurePeerToPeer { pub fn new(counts: SharedCounts, delays: [Duration; 2]) -> Self { let mut mock = MockPeerToPeerPort::default(); mock.expect_get_sealed_block_headers().returning(|range| { - Ok(Some( + let headers = Some( range .clone() .map(BlockHeight::from) .map(empty_header) .collect(), - )) + ); + let peer_id = vec![].into(); + let source_peer_data = SourcePeer { + peer_id, + data: headers, + }; + Ok(source_peer_data) }); mock.expect_get_transactions() .returning(|_| Ok(Some(vec![]))); diff --git a/crates/services/sync/src/import/tests.rs b/crates/services/sync/src/import/tests.rs index d81a9bea606..38f7c57f120 100644 --- a/crates/services/sync/src/import/tests.rs +++ b/crates/services/sync/src/import/tests.rs @@ -1,13 +1,19 @@ #![allow(non_snake_case)] use crate::{ - import::test_helpers::empty_header, + import::test_helpers::{ + empty_header, + peer_sourced_headers, + peer_sourced_headers_peer_id, + }, ports::{ MockBlockImporterPort, MockConsensusPort, MockPeerToPeerPort, + PeerReportReason, }, }; +use fuel_core_types::fuel_tx::Transaction; use test_case::test_case; use super::*; @@ -80,7 +86,7 @@ async fn import__header_not_found() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(Some(Vec::new()))); + .returning(|_| Ok(peer_sourced_headers(Some(Vec::new())))); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -102,7 +108,7 @@ async fn import__header_response_incomplete() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(None)); + .returning(|_| Ok(peer_sourced_headers(None))); let state = State::new(3, 5).into(); let mocks = Mocks { @@ -124,7 +130,7 @@ async fn import__header_5_not_found() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(Some(vec![empty_header(4.into())]))); + .returning(|_| Ok(peer_sourced_headers(Some(vec![empty_header(4.into())])))); p2p.expect_get_transactions() .times(1) .returning(|_| Ok(Some(vec![]))); @@ -149,7 +155,7 @@ async fn import__header_4_not_found() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(Some(vec![empty_header(5.into())]))); + .returning(|_| Ok(peer_sourced_headers(Some(vec![empty_header(5.into())])))); p2p.expect_get_transactions() .times(0) .returning(|_| Ok(Some(vec![]))); @@ -174,7 +180,12 @@ async fn import__transactions_not_found() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(Some(vec![empty_header(4.into()), empty_header(5.into())]))); + .returning(|_| { + Ok(peer_sourced_headers(Some(vec![ + empty_header(4.into()), + empty_header(5.into()), + ]))) + }); p2p.expect_get_transactions() .times(2) .returning(|_| Ok(None)); @@ -199,7 +210,12 @@ async fn import__transactions_not_found_for_header_4() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(Some(vec![empty_header(4.into()), empty_header(5.into())]))); + .returning(|_| { + Ok(peer_sourced_headers(Some(vec![ + empty_header(4.into()), + empty_header(5.into()), + ]))) + }); let mut height = 3; p2p.expect_get_transactions().times(2).returning(move |_| { height += 1; @@ -230,7 +246,12 @@ async fn import__transactions_not_found_for_header_5() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(Some(vec![empty_header(4.into()), empty_header(5.into())]))); + .returning(|_| { + Ok(peer_sourced_headers(Some(vec![ + empty_header(4.into()), + empty_header(5.into()), + ]))) + }); let mut height = 3; p2p.expect_get_transactions().times(2).returning(move |_| { height += 1; @@ -283,7 +304,12 @@ async fn import__p2p_error_on_4_transactions() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(Some(vec![empty_header(4.into()), empty_header(5.into())]))); + .returning(|_| { + Ok(peer_sourced_headers(Some(vec![ + empty_header(4.into()), + empty_header(5.into()), + ]))) + }); let mut height = 3; p2p.expect_get_transactions().times(2).returning(move |_| { height += 1; @@ -314,7 +340,12 @@ async fn import__p2p_error_on_5_transactions() { let mut p2p = MockPeerToPeerPort::default(); p2p.expect_get_sealed_block_headers() .times(1) - .returning(|_| Ok(Some(vec![empty_header(4.into()), empty_header(5.into())]))); + .returning(|_| { + Ok(peer_sourced_headers(Some(vec![ + empty_header(4.into()), + empty_header(5.into()), + ]))) + }); let mut height = 3; p2p.expect_get_transactions().times(2).returning(move |_| { height += 1; @@ -497,7 +528,7 @@ async fn import__can_work_in_two_loops() { .returning(move |range| { state.apply(|s| s.observe(6)); let headers = range.clone().map(|h| empty_header(h.into())).collect(); - Ok(Some(headers)) + Ok(peer_sourced_headers(Some(headers))) }); p2p.expect_get_transactions() .times(3) @@ -524,13 +555,14 @@ async fn test_import_inner( let notify = Arc::new(Notify::new()); let Mocks { consensus_port, - p2p, + mut p2p, executor, } = mocks; let params = Config { block_stream_buffer_size: 10, header_batch_size: 10, }; + p2p.expect_report_peer().returning(|_, _| Ok(())); let p2p = Arc::new(p2p); let executor = Arc::new(executor); @@ -567,6 +599,211 @@ async fn test_import_inner( (final_state, received_notify_signal) } +#[tokio::test] +async fn import__happy_path_sends_good_peer_report() { + // Given + PeerReportTestBuider::new() + // When (no changes) + // Then + .run_with_expected_report(PeerReportReason::SuccessfulBlockImport) + .await; +} + +#[tokio::test] +async fn import__multiple_blocks_happy_path_sends_good_peer_report() { + // Given + PeerReportTestBuider::new() + // When + .times(3) + // Then + .run_with_expected_report(PeerReportReason::SuccessfulBlockImport) + .await; +} + +#[tokio::test] +async fn import__missing_headers_sends_peer_report() { + // Given + PeerReportTestBuider::new() + // When + .with_get_headers(None) + // Then + .run_with_expected_report(PeerReportReason::MissingBlockHeaders) + .await; +} + +#[tokio::test] +async fn import__bad_block_header_sends_peer_report() { + // Given + PeerReportTestBuider::new() + // When + .with_check_sealed_header(false) + // Then + .run_with_expected_report(PeerReportReason::BadBlockHeader) + .await; +} + +#[tokio::test] +async fn import__missing_transactions_sends_peer_report() { + // Given + PeerReportTestBuider::new() + // When + .with_get_transactions(None) + // Then + .run_with_expected_report(PeerReportReason::MissingTransactions) + .await; +} + +struct PeerReportTestBuider { + shared_peer_id: Vec, + get_sealed_headers: Option>>, + get_transactions: Option>>, + check_sealed_header: Option, + block_count: u32, + debug: bool, +} + +impl PeerReportTestBuider { + pub fn new() -> Self { + Self { + shared_peer_id: vec![1, 2, 3, 4], + get_sealed_headers: None, + get_transactions: None, + check_sealed_header: None, + block_count: 1, + debug: false, + } + } + + #[allow(dead_code)] + pub fn debug(mut self) -> Self { + self.debug = true; + self + } + + pub fn with_get_headers( + mut self, + get_headers: Option>, + ) -> Self { + self.get_sealed_headers = Some(get_headers); + self + } + + pub fn with_get_transactions( + mut self, + get_transactions: Option>, + ) -> Self { + self.get_transactions = Some(get_transactions); + self + } + + pub fn with_check_sealed_header(mut self, check_sealed_header: bool) -> Self { + self.check_sealed_header = Some(check_sealed_header); + self + } + + pub fn times(mut self, block_count: u32) -> Self { + self.block_count = block_count; + self + } + + pub async fn run_with_expected_report(self, expected_report: PeerReportReason) { + if self.debug { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .try_init(); + } + + let p2p = self.p2p(expected_report); + let executor = self.executor(); + let consensus = self.consensus(); + + let index = self.block_count - 1; + let state = State::new(None, index).into(); + + let notify = Arc::new(Notify::new()); + + let params = Config { + block_stream_buffer_size: 10, + header_batch_size: 10, + }; + + let import = Import { + state, + notify, + params, + p2p, + executor, + consensus, + }; + let (_tx, shutdown) = + tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher = shutdown.into(); + + import.notify.notify_one(); + let _ = import.import(&mut watcher).await; + } + + fn p2p(&self, expected_report: PeerReportReason) -> Arc { + let peer_id = self.shared_peer_id.clone(); + let mut p2p = MockPeerToPeerPort::default(); + + if let Some(get_headers) = self.get_sealed_headers.clone() { + p2p.expect_get_sealed_block_headers().returning(move |_| { + Ok(peer_sourced_headers_peer_id( + get_headers.clone(), + peer_id.clone().into(), + )) + }); + } else { + p2p.expect_get_sealed_block_headers() + .returning(move |range| { + Ok(peer_sourced_headers_peer_id( + Some(range.clone().map(|h| empty_header(h.into())).collect()), + peer_id.clone().into(), + )) + }); + } + + let get_transactions = self.get_transactions.clone().unwrap_or(Some(vec![])); + p2p.expect_get_transactions() + .returning(move |_| Ok(get_transactions.clone())); + + let peer_id = self.shared_peer_id.clone(); + p2p.expect_report_peer() + .times(self.block_count as usize) + .withf(move |peer, report| { + let peer_id = peer_id.clone(); + peer.as_ref() == peer_id && report == &expected_report + }) + .returning(|_, _| Ok(())); + + Arc::new(p2p) + } + + fn executor(&self) -> Arc { + let mut executor = MockBlockImporterPort::default(); + + executor.expect_execute_and_commit().returning(|_| Ok(())); + + Arc::new(executor) + } + + fn consensus(&self) -> Arc { + let mut consensus_port = MockConsensusPort::default(); + + consensus_port + .expect_await_da_height() + .returning(|_| Ok(())); + + let check_sealed_header = self.check_sealed_header.unwrap_or(true); + consensus_port + .expect_check_sealed_header() + .returning(move |_| Ok(check_sealed_header)); + + Arc::new(consensus_port) + } +} + struct Mocks { consensus_port: MockConsensusPort, p2p: MockPeerToPeerPort, @@ -638,9 +875,9 @@ impl DefaultMocks for MockPeerToPeerPort { p2p.expect_get_sealed_block_headers() .times(1) .returning(|range| { - Ok(Some( + Ok(peer_sourced_headers(Some( range.clone().map(|h| empty_header(h.into())).collect(), - )) + ))) }); p2p.expect_get_transactions() .times(t.next().unwrap()) diff --git a/crates/services/sync/src/ports.rs b/crates/services/sync/src/ports.rs index bdd860212a4..1d270a70ed8 100644 --- a/crates/services/sync/src/ports.rs +++ b/crates/services/sync/src/ports.rs @@ -12,10 +12,31 @@ use fuel_core_types::{ }, fuel_tx::Transaction, fuel_types::BlockHeight, - services::p2p::SourcePeer, + services::p2p::{ + PeerId, + SourcePeer, + }, }; use std::ops::Range; +/// Possible reasons to report a peer +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PeerReportReason { + // Good + /// Successfully imported block + SuccessfulBlockImport, + + // Bad + /// Did not receive advertised block headers + MissingBlockHeaders, + /// Report a peer for sending a bad block header + BadBlockHeader, + /// Did not receive advertised transactions + MissingTransactions, + /// Received invalid transactions + InvalidTransactions, +} + #[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] #[async_trait::async_trait] /// Port for communication with the network. @@ -27,7 +48,7 @@ pub trait PeerToPeerPort { async fn get_sealed_block_headers( &self, block_height_range: Range, - ) -> anyhow::Result>>>; + ) -> anyhow::Result>>>; /// Request transactions from the network for the given block /// and source peer. @@ -35,6 +56,13 @@ pub trait PeerToPeerPort { &self, block_id: SourcePeer, ) -> anyhow::Result>>; + + /// Report a peer for some reason to modify their reputation. + async fn report_peer( + &self, + peer: PeerId, + report: PeerReportReason, + ) -> anyhow::Result<()>; } #[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] diff --git a/crates/services/sync/src/service/tests.rs b/crates/services/sync/src/service/tests.rs index bd1d59123fa..64dad7c22b1 100644 --- a/crates/services/sync/src/service/tests.rs +++ b/crates/services/sync/src/service/tests.rs @@ -8,7 +8,10 @@ use futures::{ }; use crate::{ - import::test_helpers::empty_header, + import::test_helpers::{ + empty_header, + peer_sourced_headers, + }, ports::{ MockBlockImporterPort, MockConsensusPort, @@ -21,6 +24,7 @@ use super::*; #[tokio::test] async fn test_new_service() { let mut p2p = MockPeerToPeerPort::default(); + p2p.expect_report_peer().returning(|_, _| Ok(())); p2p.expect_height_stream().returning(|| { stream::iter( std::iter::successors(Some(6u32), |n| Some(n + 1)).map(BlockHeight::from), @@ -34,13 +38,13 @@ async fn test_new_service() { .into_boxed() }); p2p.expect_get_sealed_block_headers().returning(|range| { - Ok(Some( + Ok(peer_sourced_headers(Some( range .clone() .map(BlockHeight::from) .map(empty_header) .collect(), - )) + ))) }); p2p.expect_get_transactions() .returning(|_| Ok(Some(vec![]))); diff --git a/crates/types/src/services/p2p/peer_reputation.rs b/crates/types/src/services/p2p/peer_reputation.rs index 9fd02b67fc8..7c83aab49a4 100644 --- a/crates/types/src/services/p2p/peer_reputation.rs +++ b/crates/types/src/services/p2p/peer_reputation.rs @@ -15,42 +15,3 @@ pub trait PeerReport { /// Extracts PeerScore from the Report fn get_score_from_report(&self) -> AppScore; } - -/// Example of negative PeerReport -#[derive(Debug, Clone)] -pub enum NegativePeerReport { - /// Worst offense, peer should likely be banned after this - Fatal, - /// Minor offense, deduct few points - Minor, - /// Major offense, deduct reasonable amount of points - Major, -} - -impl PeerReport for NegativePeerReport { - fn get_score_from_report(&self) -> AppScore { - match self { - Self::Fatal => -MAX_APP_SCORE - 10.0, - Self::Major => -10.0, - Self::Minor => -5.0, - } - } -} - -/// Example of positive PeerReport -#[derive(Debug, Clone)] -pub enum PositivePeerReport { - /// Minor positive feedback, increase reputation slightly - Minor, - /// Major positive feedback, increase reputation - Major, -} - -impl PeerReport for PositivePeerReport { - fn get_score_from_report(&self) -> AppScore { - match self { - Self::Major => 5.0, - Self::Minor => 1.0, - } - } -}