Skip to content

Commit

Permalink
Report Peers that give bad Block Info (#1331)
Browse files Browse the repository at this point in the history
First implementation of #943

During block import, if the peer gives incorrect values, dock their peer
score. If they successfully report a block, improve their peer score.
  • Loading branch information
MitchTurner authored Aug 31, 2023
1 parent 75e7e58 commit 87d54b2
Show file tree
Hide file tree
Showing 18 changed files with 581 additions and 147 deletions.
19 changes: 7 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@ Description of the upcoming release here.
- [#1286](https://github.com/FuelLabs/fuel-core/pull/1286): Include readable names for test cases where missing.
- [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization.
- [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions.

#### Breaking

- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322):
The `debug` flag is added to the CLI. The flag should be used for local development only. Enabling debug mode:
- Allows GraphQL Endpoints to arbitrarily advance blocks.
- Enables debugger GraphQL Endpoints.
- Allows setting `utxo_validation` to `false`.
- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code

### Changed

Expand All @@ -39,9 +32,11 @@ Description of the upcoming release here.
- [#1290](https://github.com/FuelLabs/fuel-core/pull/1290): Standardize CLI args to use `-` instead of `_`.
- [#1279](https://github.com/FuelLabs/fuel-core/pull/1279): Added a new CLI flag to enable the Relayer service `--enable-relayer`, and disabled the Relayer service by default. When supplying the `--enable-relayer` flag, the `--relayer` argument becomes mandatory, and omitting it is an error. Similarly, providing a `--relayer` argument without the `--enable-relayer` flag is an error. Lastly, providing the `--keypair` or `--network` arguments will also produce an error if the `--enable-p2p` flag is not set.
- [#1262](https://github.com/FuelLabs/fuel-core/pull/1262): The `ConsensusParameters` aggregates all configuration data related to the consensus. It contains many fields that are segregated by the usage. The API of some functions was affected to use lesser types instead the whole `ConsensusParameters`. It is a huge breaking change requiring repetitively monotonically updating all places that use the `ConsensusParameters`. But during updating, consider that maybe you can use lesser types. Usage of them may simplify signatures of methods and make them more user-friendly and transparent.

### Removed

#### Breaking
- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322): The `manual_blocks_enabled` flag is removed from the CLI. The analog is a `debug` flag.
- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322):
The `debug` flag is added to the CLI. The flag should be used for local development only. Enabling debug mode:
- Allows GraphQL Endpoints to arbitrarily advance blocks.
- Enables debugger GraphQL Endpoints.
- Allows setting `utxo_validation` to `false`.

### Removed
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{
use fuel_core_consensus_module::block_verifier::Verifier;
use fuel_core_txpool::service::SharedState as TxPoolSharedState;
use fuel_core_types::fuel_types::BlockHeight;
#[cfg(feature = "p2p")]
use fuel_core_types::services::p2p::peer_reputation::AppScore;
use std::sync::Arc;

pub mod block_importer;
Expand Down Expand Up @@ -87,6 +89,17 @@ pub struct BlockImporterAdapter {
#[derive(Clone)]
pub struct P2PAdapter {
service: Option<fuel_core_p2p::service::SharedState>,
peer_report_config: PeerReportConfig,
}

#[cfg(feature = "p2p")]
#[derive(Clone)]
pub struct PeerReportConfig {
pub successful_block_import: AppScore,
pub missing_block_headers: AppScore,
pub bad_block_header: AppScore,
pub missing_transactions: AppScore,
pub invalid_transactions: AppScore,
}

#[cfg(not(feature = "p2p"))]
Expand All @@ -95,8 +108,14 @@ pub struct P2PAdapter;

#[cfg(feature = "p2p")]
impl P2PAdapter {
pub fn new(service: Option<fuel_core_p2p::service::SharedState>) -> Self {
Self { service }
pub fn new(
service: Option<fuel_core_p2p::service::SharedState>,
peer_report_config: PeerReportConfig,
) -> Self {
Self {
service,
peer_report_config,
}
}
}

Expand Down
79 changes: 61 additions & 18 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_sync::ports::{
BlockImporterPort,
ConsensusPort,
PeerReportReason,
PeerToPeerPort,
};
use fuel_core_types::{
Expand All @@ -21,6 +22,10 @@ use fuel_core_types::{
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::{
peer_reputation::{
AppScore,
PeerReport,
},
PeerId,
SourcePeer,
},
Expand All @@ -46,25 +51,17 @@ impl PeerToPeerPort for P2PAdapter {
async fn get_sealed_block_headers(
&self,
block_range_height: Range<u32>,
) -> anyhow::Result<Option<Vec<SourcePeer<SealedBlockHeader>>>> {
) -> anyhow::Result<SourcePeer<Option<Vec<SealedBlockHeader>>>> {
if let Some(service) = &self.service {
Ok(service
.get_sealed_block_headers(block_range_height)
.await?
.and_then(|(peer_id, headers)| {
let peer_id: PeerId = peer_id.into();
headers.map(|headers| {
headers
.into_iter()
.map(|header| SourcePeer {
peer_id: peer_id.clone(),
data: header,
})
.collect()
})
}))
let (peer_id, headers) =
service.get_sealed_block_headers(block_range_height).await?;
let sourced_headers = SourcePeer {
peer_id: peer_id.into(),
data: headers,
};
Ok(sourced_headers)
} else {
Ok(None)
Err(anyhow::anyhow!("No P2P service available"))
}
}

Expand All @@ -81,11 +78,57 @@ impl PeerToPeerPort for P2PAdapter {
.get_transactions_from_peer(peer_id.into(), block)
.await
} else {
Ok(None)
Err(anyhow::anyhow!("No P2P service available"))
}
}

async fn report_peer(
&self,
peer: PeerId,
report: PeerReportReason,
) -> anyhow::Result<()> {
if let Some(service) = &self.service {
let service_name = "Sync";
let new_report = self.process_report(report);
service.report_peer(peer, new_report, service_name)?;
Ok(())
} else {
Err(anyhow::anyhow!("No P2P service available"))
}
}
}

impl P2PAdapter {
fn process_report(&self, reason: PeerReportReason) -> P2PAdapterPeerReport {
let score = match &reason {
PeerReportReason::SuccessfulBlockImport => {
self.peer_report_config.successful_block_import
}
PeerReportReason::MissingBlockHeaders => {
self.peer_report_config.missing_block_headers
}
PeerReportReason::BadBlockHeader => self.peer_report_config.bad_block_header,
PeerReportReason::MissingTransactions => {
self.peer_report_config.missing_transactions
}
PeerReportReason::InvalidTransactions => {
self.peer_report_config.invalid_transactions
}
};
P2PAdapterPeerReport { score }
}
}

struct P2PAdapterPeerReport {
score: AppScore,
}

impl PeerReport for P2PAdapterPeerReport {
fn get_score_from_report(&self) -> AppScore {
self.score
}
}

#[async_trait::async_trait]
impl BlockImporterPort for BlockImporterAdapter {
fn committed_height_stream(&self) -> BoxStream<BlockHeight> {
Expand Down
20 changes: 18 additions & 2 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,24 @@ pub fn init_sub_services(
};

#[cfg(feature = "p2p")]
let p2p_adapter =
P2PAdapter::new(network.as_ref().map(|network| network.shared.clone()));
let p2p_adapter = {
use crate::service::adapters::PeerReportConfig;

// Hardcoded for now, but left here to be configurable in the future.
// TODO: https://github.com/FuelLabs/fuel-core/issues/1340
let peer_report_config = PeerReportConfig {
successful_block_import: 5.,
missing_block_headers: -100.,
bad_block_header: -100.,
missing_transactions: -100.,
invalid_transactions: -100.,
};
P2PAdapter::new(
network.as_ref().map(|network| network.shared.clone()),
peer_report_config,
)
};

#[cfg(not(feature = "p2p"))]
let p2p_adapter = P2PAdapter::new();

Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ impl ConnectionHandler for HeartbeatHandler {
Self::Error,
>,
> {
if let Some(inbound_block_height) = self.inbound.as_mut() {
match inbound_block_height.poll_unpin(cx) {
if let Some(inbound_stream_and_block_height) = self.inbound.as_mut() {
match inbound_stream_and_block_height.poll_unpin(cx) {
Poll::Ready(Err(_)) => {
debug!(target: "fuel-libp2p", "Incoming heartbeat errored");
self.inbound = None;
Expand Down
6 changes: 3 additions & 3 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
Some(ResponseChannelItem::SealedHeaders(channel)),
Ok(ResponseMessage::SealedHeaders(headers)),
) => {
if channel.send(Some((peer, headers))).is_err() {
if channel.send((peer, headers)).is_err() {
debug!(
"Failed to send through the channel for {:?}",
request_id
Expand Down Expand Up @@ -1191,7 +1191,7 @@ mod tests {
}
tracing::info!("Node B Event: {:?}", node_b_event);
}
};
}
}
}

Expand Down Expand Up @@ -1595,7 +1595,7 @@ mod tests {

let expected = arbitrary_headers_for_range(range.clone());

if let Ok(Some((_, sealed_headers))) = response_message {
if let Ok((_, sealed_headers)) = response_message {
let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b));
let _ = tx_test_end.send(check).await;
} else {
Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ impl PeerManager {
peer_id: &PeerId,
block_height: BlockHeight,
) {
if let Some(previous_heartbeat) = self
if let Some(time_elapsed) = self
.get_peer_info(peer_id)
.and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat())
{
debug!(target: "fuel-p2p", "Previous hearbeat happened {:?} seconds ago", previous_heartbeat);
debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} seconds ago", time_elapsed);
}

let heartbeat_data = HeartbeatData::new(block_height);
Expand Down
8 changes: 3 additions & 5 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1";
/// Max Size in Bytes of the Request Message
pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::<RequestMessage>();

pub type ChannelItem<T> = oneshot::Sender<Option<T>>;

// Peer receives a `RequestMessage`.
// It prepares a response in form of `OutboundResponse`
// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format.
Expand Down Expand Up @@ -59,9 +57,9 @@ pub enum ResponseMessage {
/// Holds oneshot channels for specific responses
#[derive(Debug)]
pub enum ResponseChannelItem {
Block(ChannelItem<SealedBlock>),
SealedHeaders(ChannelItem<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(ChannelItem<Vec<Transaction>>),
Block(oneshot::Sender<Option<SealedBlock>>),
SealedHeaders(oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(oneshot::Sender<Option<Vec<Transaction>>>),
}

/// Response that is sent over the wire
Expand Down
6 changes: 3 additions & 3 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ enum TaskRequest {
},
GetSealedHeaders {
block_height_range: Range<u32>,
channel: oneshot::Sender<Option<(PeerId, Option<Vec<SealedBlockHeader>>)>>,
channel: oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>,
},
GetTransactions {
block_id: BlockId,
Expand Down Expand Up @@ -384,7 +384,7 @@ impl SharedState {
pub async fn get_sealed_block_headers(
&self,
block_height_range: Range<u32>,
) -> anyhow::Result<Option<(Vec<u8>, Option<Vec<SealedBlockHeader>>)>> {
) -> anyhow::Result<(Vec<u8>, Option<Vec<SealedBlockHeader>>)> {
let (sender, receiver) = oneshot::channel();

if block_height_range.is_empty() {
Expand All @@ -402,7 +402,7 @@ impl SharedState {

receiver
.await
.map(|o| o.map(|(peer_id, headers)| (peer_id.to_bytes(), headers)))
.map(|(peer_id, headers)| (peer_id.to_bytes(), headers))
.map_err(|e| anyhow!("{}", e))
}

Expand Down
1 change: 1 addition & 0 deletions crates/services/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fuel-core-trace = { path = "../../trace" }
fuel-core-types = { path = "../../types", features = ["test-helpers"] }
mockall = { workspace = true }
test-case = { workspace = true }
tracing-subscriber = { workspace = true }

[features]
benchmarking = ["dep:mockall", "fuel-core-types/test-helpers"]
Loading

0 comments on commit 87d54b2

Please sign in to comment.