From 9c2e9cbd7c127fcb881d406fbf152bbe87ed10fe Mon Sep 17 00:00:00 2001
From: pompon0
Date: Thu, 17 Nov 2022 17:07:12 +0100
Subject: [PATCH 001/188] deflaked fix_local_edges test (#8079)
Removed a flake of fix_local_edges test that I managed to reproduce locally.
The event ConnectionClosed was being emitted too early: it should be sent only once the asynchronous call to unregister completes.
---
chain/network/src/peer/peer_actor.rs | 47 ++++++++++++-------
.../src/peer_manager/network_state/mod.rs | 14 ++++--
chain/network/src/peer_manager/tests/nonce.rs | 2 +-
3 files changed, 41 insertions(+), 22 deletions(-)
diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs
index 0a32a294a2d..2a03727a612 100644
--- a/chain/network/src/peer/peer_actor.rs
+++ b/chain/network/src/peer/peer_actor.rs
@@ -92,6 +92,8 @@ pub(crate) enum ClosingReason {
PeerManager,
#[error("Received DisconnectMessage from peer")]
DisconnectMessage,
+ #[error("PeerActor stopped NOT via PeerActor::stop()")]
+ Unknown,
}
pub(crate) struct PeerActor {
@@ -1143,36 +1145,47 @@ impl actix::Actor for PeerActor {
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
+ Running::Stop
+ }
+
+ fn stopped(&mut self, _ctx: &mut Self::Context) {
+ // closing_reason may be None in case the whole actix system is stopped.
+ // It happens a lot in tests.
metrics::PEER_CONNECTIONS_TOTAL.dec();
debug!(target: "network", "{:?}: [status = {:?}] Peer {} disconnected.", self.my_node_info.id, self.peer_status, self.peer_info);
+ if self.closing_reason.is_none() {
+ // Due to Actix semantics, sometimes closing reason may be not set.
+ // But it is only expected to happen in tests.
+ tracing::error!(target:"network", "closing reason not set. This should happen only in tests.");
+ }
match &self.peer_status {
// If PeerActor is in Connecting state, then
// it was not registered in the NewtorkState,
// so there is nothing to be done.
- PeerStatus::Connecting(..) => {}
+ PeerStatus::Connecting(..) => {
+ // TODO(gprusak): reporting ConnectionClosed event is quite scattered right now and
+ // it is very ugly: it may happen here, in spawn_inner, or in NetworkState::unregister().
+ // Centralize it, once we get rid of actix.
+ self.network_state.config.event_sink.push(Event::ConnectionClosed(
+ ConnectionClosedEvent {
+ stream_id: self.stream_id,
+ reason: self.closing_reason.clone().unwrap_or(ClosingReason::Unknown),
+ },
+ ));
+ }
// Clean up the Connection from the NetworkState.
PeerStatus::Ready(conn) => {
let network_state = self.network_state.clone();
let clock = self.clock.clone();
let conn = conn.clone();
- let ban_reason = match self.closing_reason {
- Some(ClosingReason::Ban(reason)) => Some(reason),
- _ => None,
- };
- network_state.unregister(&clock, &conn, ban_reason);
+ network_state.unregister(
+ &clock,
+ &conn,
+ self.stream_id,
+ self.closing_reason.clone().unwrap_or(ClosingReason::Unknown),
+ );
}
}
- Running::Stop
- }
-
- fn stopped(&mut self, _ctx: &mut Self::Context) {
- // closing_reason may be None in case the whole actix system is stopped.
- // It happens a lot in tests.
- if let Some(reason) = self.closing_reason.take() {
- self.network_state.config.event_sink.push(Event::ConnectionClosed(
- ConnectionClosedEvent { stream_id: self.stream_id, reason },
- ));
- }
actix::Arbiter::current().stop();
}
}
diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs
index c7d814b88cb..1de33f4a32c 100644
--- a/chain/network/src/peer_manager/network_state/mod.rs
+++ b/chain/network/src/peer_manager/network_state/mod.rs
@@ -6,6 +6,7 @@ use crate::network_protocol::{
Edge, EdgeState, PartialEdgeInfo, PeerIdOrHash, PeerInfo, PeerMessage, Ping, Pong,
RawRoutedMessage, RoutedMessageBody, RoutedMessageV2, RoutingTableUpdate, SignedAccountData,
};
+use crate::peer::peer_actor::{ClosingReason, ConnectionClosedEvent};
use crate::peer_manager::connection;
use crate::peer_manager::peer_manager_actor::Event;
use crate::peer_manager::peer_store;
@@ -14,6 +15,7 @@ use crate::routing;
use crate::routing::routing_table_view::RoutingTableView;
use crate::stats::metrics;
use crate::store;
+use crate::tcp;
use crate::time;
use crate::types::{ChainInfo, PeerType, ReasonForBan};
use arc_swap::ArcSwap;
@@ -306,7 +308,8 @@ impl NetworkState {
self: &Arc,
clock: &time::Clock,
conn: &Arc,
- ban_reason: Option,
+ stream_id: tcp::StreamId,
+ reason: ClosingReason,
) {
let this = self.clone();
let clock = clock.clone();
@@ -326,15 +329,18 @@ impl NetworkState {
}
// Save the fact that we are disconnecting to the PeerStore.
- let res = match ban_reason {
- Some(ban_reason) => {
+ let res = match reason {
+ ClosingReason::Ban(ban_reason) => {
this.peer_store.peer_ban(&clock, &conn.peer_info.id, ban_reason)
}
- None => this.peer_store.peer_disconnected(&clock, &conn.peer_info.id),
+ _ => this.peer_store.peer_disconnected(&clock, &conn.peer_info.id),
};
if let Err(err) = res {
tracing::error!(target: "network", ?err, "Failed to save peer data");
}
+ this.config
+ .event_sink
+ .push(Event::ConnectionClosed(ConnectionClosedEvent { stream_id, reason }));
});
}
diff --git a/chain/network/src/peer_manager/tests/nonce.rs b/chain/network/src/peer_manager/tests/nonce.rs
index 59e6ecc3385..1e6b18a4a32 100644
--- a/chain/network/src/peer_manager/tests/nonce.rs
+++ b/chain/network/src/peer_manager/tests/nonce.rs
@@ -57,7 +57,7 @@ async fn test_nonces() {
];
for test in test_cases {
- println!("Running test {:?}", test.2);
+ tracing::info!(target: "test", "Running test {:?}", test.2);
let cfg = peer::testonly::PeerConfig {
network: chain.make_config(rng),
chain: chain.clone(),
From be239bb72c9d61614565ce843e1a42ca78ed128d Mon Sep 17 00:00:00 2001
From: Alex Kladov
Date: Thu, 17 Nov 2022 16:26:08 +0000
Subject: [PATCH 002/188] Fix typo (#8080)
---
docs/practices/workflows/README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/practices/workflows/README.md b/docs/practices/workflows/README.md
index ab348153b0a..ec59d47388c 100644
--- a/docs/practices/workflows/README.md
+++ b/docs/practices/workflows/README.md
@@ -1,3 +1,3 @@
-= Workflows
+# Workflows
This chapter documents various way you can run `neard` during development: running a local net, joining a test net, doing benchmarking and load testing.
From fdd55774348cc1e42b487db130318fddb295e329 Mon Sep 17 00:00:00 2001
From: Akhilesh Singhania
Date: Thu, 17 Nov 2022 17:54:07 +0100
Subject: [PATCH 003/188] docs: document the process for tracking security
sensitive issues (#8078)
https://docs.google.com/document/d/14w1Sdk1wXBTJIuzEksl1U1X_tMtvjn3fwdaeP_mIdPE/edit is the internal document being exported here.
---
docs/SUMMARY.md | 1 +
docs/practices/security_vulnerabilities.md | 46 ++++++++++++++++++++++
2 files changed, 47 insertions(+)
create mode 100644 docs/practices/security_vulnerabilities.md
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
index c2588ff8ee0..a6c0d0a890a 100644
--- a/docs/SUMMARY.md
+++ b/docs/SUMMARY.md
@@ -31,6 +31,7 @@
- [Code Style](./practices/style.md)
- [Documentation](./practices/docs.md)
- [Tracking Issues](./practices/tracking_issues.md)
+- [Security Vulnerabilities](./practices/security_vulnerabilities.md)
- [Fast Builds](./practices/fast_builds.md)
- [Testing](./practices/testing/README.md)
- [Python Tests](./practices/testing/python_tests.md)
diff --git a/docs/practices/security_vulnerabilities.md b/docs/practices/security_vulnerabilities.md
new file mode 100644
index 00000000000..4697bab3ef5
--- /dev/null
+++ b/docs/practices/security_vulnerabilities.md
@@ -0,0 +1,46 @@
+## Security Vulnerabilities
+
+
+The intended audience of the information presented here are developers working
+on the implementation of NEAR.
+
+Are you a security researcher? Please report security vulnerabilities to
+security@near.org.
+
+
+As nearcore is open source, all of its issues and pull requests are also
+publicly tracked on github. However, from time to time, if a security sensitive
+issue is discovered, those cannot be tracked publicly on github. However, we
+should promote as similar a development process to work on such issues as
+possible. To enable this, below is the high level process for working on
+security sensitive issues.
+
+1. There is a [private fork of
+ nearcore](https://github.com/near/nearcore-private) on github. Access to
+ this repository is restricted to the set of people who are trusted to work on
+ and have knowledge about security sensitive issues pertaining to nearcore.
+
+ This repository can be manually synced with the public nearcore repository
+ using the following commands:
+
+ ```console
+ $ git remote add nearcore-public git@github.com:near/nearcore
+ $ git remote add nearcore-private git@github.com:near/nearcore-private
+ $ git fetch nearcore-public
+ $ git push nearcore-private nearcore-public/master:master
+ ```
+2. All security sensitive issues must be created on the private nearcore
+ repository. You must also assign one of the `[P-S0, P-S1]` labels to the
+ issue to indicate the severity of the issue. The two criteria to use to help
+ you judge the severity are ease of carrying out the attack and the impact of
+ the attack. An attack that is easy to do or can have a huge impact should
+ have the `P-S0` label and `P-S1` otherwise.
+
+3. All security sensitive pull requests should also be created on the private
+ nearcore repository. Note that once a PR has been approved, it should not be
+ merged into the private repository. Instead it should be first merged into
+ the public repository and then the private fork should be updated using the
+ steps above.
+
+4. Once work on a security issue is finished, it needs to be deployed to all the
+ impacted networks. Please contact the node team for help with this.
From 099aa532bddc89a6f1b4f83c532d6fe6d2c756eb Mon Sep 17 00:00:00 2001
From: robin-near <111538878+robin-near@users.noreply.github.com>
Date: Thu, 17 Nov 2022 09:34:56 -0800
Subject: [PATCH 004/188] Improve consensus performance when there are invalid
chunks. (#8060)
---
chain/chain/src/block_processing_utils.rs | 3 +-
chain/chain/src/chain.rs | 64 +++-
chain/chain/src/tests/challenges.rs | 1 +
chain/chunks/src/client.rs | 19 +-
chain/chunks/src/lib.rs | 10 +-
chain/chunks/src/test_utils.rs | 2 +-
chain/client-primitives/src/debug.rs | 3 +
chain/client/src/client.rs | 147 +++++++-
chain/client/src/client_actor.rs | 12 +-
chain/client/src/debug.rs | 22 +-
chain/client/src/metrics.rs | 18 +
chain/client/src/test_utils.rs | 55 ++-
chain/jsonrpc/res/validator.html | 14 +-
.../src/tests/client/features.rs | 1 +
.../client/features/adversarial_behaviors.rs | 353 ++++++++++++++++++
.../src/tests/client/process_blocks.rs | 3 +-
.../src/tests/client/shards_manager.rs | 2 +-
17 files changed, 652 insertions(+), 77 deletions(-)
create mode 100644 integration-tests/src/tests/client/features/adversarial_behaviors.rs
diff --git a/chain/chain/src/block_processing_utils.rs b/chain/chain/src/block_processing_utils.rs
index a5211537e8e..ec2f6859f2a 100644
--- a/chain/chain/src/block_processing_utils.rs
+++ b/chain/chain/src/block_processing_utils.rs
@@ -4,7 +4,7 @@ use crate::Provenance;
use near_primitives::block::Block;
use near_primitives::challenge::{ChallengeBody, ChallengesResult};
use near_primitives::hash::CryptoHash;
-use near_primitives::sharding::{ReceiptProof, StateSyncInfo};
+use near_primitives::sharding::{ReceiptProof, ShardChunkHeader, StateSyncInfo};
use near_primitives::types::ShardId;
use once_cell::sync::OnceCell;
use std::collections::HashMap;
@@ -64,6 +64,7 @@ pub struct BlockProcessingArtifact {
pub orphans_missing_chunks: Vec,
pub blocks_missing_chunks: Vec,
pub challenges: Vec,
+ pub invalid_chunks: Vec,
}
/// This struct defines the callback function that will be called after apply chunks are finished
diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs
index 467e1a76692..6929a992c59 100644
--- a/chain/chain/src/chain.rs
+++ b/chain/chain/src/chain.rs
@@ -130,7 +130,7 @@ const NEAR_BASE: Balance = 1_000_000_000_000_000_000_000_000;
/// has not been caught up yet, thus the two modes IsCaughtUp and NotCaughtUp.
/// CatchingUp is for when apply_chunks is called through catchup_blocks, this is to catch up the
/// shard states for the next epoch
-#[derive(Eq, PartialEq)]
+#[derive(Eq, PartialEq, Copy, Clone, Debug)]
enum ApplyChunksMode {
IsCaughtUp,
CatchingUp,
@@ -1937,6 +1937,7 @@ impl Chain {
&block,
&provenance,
&mut block_processing_artifact.challenges,
+ &mut block_processing_artifact.invalid_chunks,
block_received_time,
state_patch,
);
@@ -2113,6 +2114,14 @@ impl Chain {
let prev_head = self.store.head()?;
let provenance = block_preprocess_info.provenance.clone();
let block_start_processing_time = block_preprocess_info.block_start_processing_time.clone();
+ // TODO(#8055): this zip relies on the ordering of the apply_results.
+ for (apply_result, chunk) in apply_results.iter().zip(block.chunks().iter()) {
+ if let Err(err) = apply_result {
+ if err.is_bad_data() {
+ block_processing_artifacts.invalid_chunks.push(chunk.clone());
+ }
+ }
+ }
let new_head =
match self.postprocess_block_only(me, &block, block_preprocess_info, apply_results) {
Err(err) => {
@@ -2235,6 +2244,7 @@ impl Chain {
block: &MaybeValidated,
provenance: &Provenance,
challenges: &mut Vec,
+ invalid_chunks: &mut Vec,
block_received_time: Instant,
state_patch: SandboxStatePatch,
) -> Result<
@@ -2399,6 +2409,7 @@ impl Chain {
// otherwise put the block into the permanent storage, waiting for be caught up
if is_caught_up { ApplyChunksMode::IsCaughtUp } else { ApplyChunksMode::NotCaughtUp },
state_patch,
+ invalid_chunks,
)?;
Ok((
@@ -3241,6 +3252,7 @@ impl Chain {
&receipts_by_shard,
ApplyChunksMode::CatchingUp,
Default::default(),
+ &mut Vec::new(),
)?;
blocks_catch_up_state.scheduled_blocks.insert(pending_block);
block_catch_up_scheduler(BlockCatchUpRequest {
@@ -3571,14 +3583,12 @@ impl Chain {
incoming_receipts: &HashMap>,
mode: ApplyChunksMode,
mut state_patch: SandboxStatePatch,
+ invalid_chunks: &mut Vec,
) -> Result<
Vec Result + Send + 'static>>,
Error,
> {
let _span = tracing::debug_span!(target: "chain", "apply_chunks_preprocessing").entered();
- let mut result: Vec<
- Box Result + Send + 'static>,
- > = Vec::new();
#[cfg(not(feature = "mock_node"))]
let protocol_version =
self.runtime_adapter.get_epoch_protocol_version(block.header().epoch_id())?;
@@ -3587,9 +3597,13 @@ impl Chain {
let will_shard_layout_change =
self.runtime_adapter.will_shard_layout_change_next_epoch(prev_hash)?;
let prev_chunk_headers = Chain::get_prev_chunk_headers(&*self.runtime_adapter, prev_block)?;
- for (shard_id, (chunk_header, prev_chunk_header)) in
- (block.chunks().iter().zip(prev_chunk_headers.iter())).enumerate()
- {
+ let mut process_one_chunk = |shard_id: usize,
+ chunk_header: &ShardChunkHeader,
+ prev_chunk_header: &ShardChunkHeader|
+ -> Result<
+ Option Result + Send + 'static>>,
+ Error,
+ > {
// XXX: This is a bit questionable -- sandbox state patching works
// only for a single shard. This so far has been enough.
let state_patch = state_patch.take();
@@ -3747,7 +3761,7 @@ impl Chain {
let height = chunk_header.height_included();
let prev_block_hash = chunk_header.prev_block_hash().clone();
- result.push(Box::new(move |parent_span| -> Result {
+ Ok(Some(Box::new(move |parent_span| -> Result {
let _span = tracing::debug_span!(
target: "chain",
parent: parent_span,
@@ -3796,7 +3810,7 @@ impl Chain {
}
Err(err) => Err(err),
}
- }));
+ })))
} else {
let new_extra = self.get_chunk_extra(prev_block.hash(), &shard_uid)?.clone();
@@ -3809,7 +3823,7 @@ impl Chain {
let height = block.header().height();
let prev_block_hash = *prev_block.hash();
- result.push(Box::new(move |parent_span| -> Result {
+ Ok(Some(Box::new(move |parent_span| -> Result {
let _span = tracing::debug_span!(
target: "chain",
parent: parent_span,
@@ -3856,7 +3870,7 @@ impl Chain {
}
Err(err) => Err(err),
}
- }));
+ })))
}
} else if let Some(split_state_roots) = split_state_roots {
// case 3)
@@ -3869,7 +3883,7 @@ impl Chain {
self.store().get_state_changes_for_split_states(block.hash(), shard_id)?;
let runtime_adapter = self.runtime_adapter.clone();
let block_hash = *block.hash();
- result.push(Box::new(move |parent_span| -> Result {
+ Ok(Some(Box::new(move |parent_span| -> Result {
let _span = tracing::debug_span!(
target: "chain",
parent: parent_span,
@@ -3886,11 +3900,29 @@ impl Chain {
state_changes,
)?,
}))
- }));
+ })))
+ } else {
+ Ok(None)
}
- }
-
- Ok(result)
+ };
+ block
+ .chunks()
+ .iter()
+ .zip(prev_chunk_headers.iter())
+ .enumerate()
+ .filter_map(|(shard_id, (chunk_header, prev_chunk_header))| {
+ match process_one_chunk(shard_id, chunk_header, prev_chunk_header) {
+ Ok(Some(processor)) => Some(Ok(processor)),
+ Ok(None) => None,
+ Err(err) => {
+ if err.is_bad_data() {
+ invalid_chunks.push(chunk_header.clone());
+ }
+ Some(Err(err))
+ }
+ }
+ })
+ .collect()
}
}
diff --git a/chain/chain/src/tests/challenges.rs b/chain/chain/src/tests/challenges.rs
index 84b47cd7ea9..885eac89c76 100644
--- a/chain/chain/src/tests/challenges.rs
+++ b/chain/chain/src/tests/challenges.rs
@@ -52,6 +52,7 @@ fn challenges_new_head_prev() {
&MaybeValidated::from(last_block),
&Provenance::NONE,
&mut vec![],
+ &mut vec![],
Clock::instant(),
Default::default(),
) {
diff --git a/chain/chunks/src/client.rs b/chain/chunks/src/client.rs
index 0fa27d7d22f..72824875d29 100644
--- a/chain/chunks/src/client.rs
+++ b/chain/chunks/src/client.rs
@@ -8,7 +8,7 @@ use near_primitives::{
epoch_manager::RngSeed,
sharding::{EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader},
transaction::SignedTransaction,
- types::ShardId,
+ types::{AccountId, ShardId},
};
pub trait ClientAdapterForShardsManager {
@@ -18,7 +18,11 @@ pub trait ClientAdapterForShardsManager {
shard_chunk: Option,
);
fn saw_invalid_chunk(&self, chunk: EncodedShardChunk);
- fn chunk_header_ready_for_inclusion(&self, chunk_header: ShardChunkHeader);
+ fn chunk_header_ready_for_inclusion(
+ &self,
+ chunk_header: ShardChunkHeader,
+ chunk_producer: AccountId,
+ );
}
#[derive(Message)]
@@ -26,7 +30,7 @@ pub trait ClientAdapterForShardsManager {
pub enum ShardsManagerResponse {
ChunkCompleted { partial_chunk: PartialEncodedChunk, shard_chunk: Option },
InvalidChunk(EncodedShardChunk),
- ChunkHeaderReadyForInclusion(ShardChunkHeader),
+ ChunkHeaderReadyForInclusion { chunk_header: ShardChunkHeader, chunk_producer: AccountId },
}
impl>> ClientAdapterForShardsManager for A {
@@ -43,9 +47,14 @@ impl>> ClientAdapterForSh
fn saw_invalid_chunk(&self, chunk: EncodedShardChunk) {
self.do_send(ShardsManagerResponse::InvalidChunk(chunk).with_span_context());
}
- fn chunk_header_ready_for_inclusion(&self, chunk_header: ShardChunkHeader) {
+ fn chunk_header_ready_for_inclusion(
+ &self,
+ chunk_header: ShardChunkHeader,
+ chunk_producer: AccountId,
+ ) {
self.do_send(
- ShardsManagerResponse::ChunkHeaderReadyForInclusion(chunk_header).with_span_context(),
+ ShardsManagerResponse::ChunkHeaderReadyForInclusion { chunk_header, chunk_producer }
+ .with_span_context(),
);
}
}
diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs
index ae9067416e4..02534b18bcf 100644
--- a/chain/chunks/src/lib.rs
+++ b/chain/chunks/src/lib.rs
@@ -975,11 +975,12 @@ impl ShardsManager {
.collect()
}
+ /// Returns true if we were able to answer the request. This is for testing.
pub fn process_partial_encoded_chunk_request(
&mut self,
request: PartialEncodedChunkRequestMsg,
route_back: CryptoHash,
- ) {
+ ) -> bool {
let _span = tracing::debug_span!(
target: "chunks",
"process_partial_encoded_chunk_request",
@@ -1005,8 +1006,10 @@ impl ShardsManager {
NetworkRequests::PartialEncodedChunkResponse { route_back, response },
)
.with_span_context(),
- )
+ );
+ return true;
}
+ false
}
fn prepare_partial_encoded_chunk_response(
@@ -1799,7 +1802,8 @@ impl ShardsManager {
if have_all_parts && self.seals_mgr.should_trust_chunk_producer(&chunk_producer) {
if self.encoded_chunks.mark_chunk_for_inclusion(&chunk_hash) {
- self.client_adapter.chunk_header_ready_for_inclusion(header.clone());
+ self.client_adapter
+ .chunk_header_ready_for_inclusion(header.clone(), chunk_producer);
}
}
// we can safely unwrap here because we already checked that chunk_hash exist in encoded_chunks
diff --git a/chain/chunks/src/test_utils.rs b/chain/chunks/src/test_utils.rs
index e2717b2f51d..157c422ee07 100644
--- a/chain/chunks/src/test_utils.rs
+++ b/chain/chunks/src/test_utils.rs
@@ -332,7 +332,7 @@ impl ChunkTestFixture {
pub fn count_chunk_ready_for_inclusion_messages(&self) -> usize {
let mut chunks_ready = 0;
while let Some(message) = self.mock_client_adapter.pop() {
- if let ShardsManagerResponse::ChunkHeaderReadyForInclusion(_) = message {
+ if let ShardsManagerResponse::ChunkHeaderReadyForInclusion { .. } = message {
chunks_ready += 1;
}
}
diff --git a/chain/client-primitives/src/debug.rs b/chain/client-primitives/src/debug.rs
index d5d52bb5065..62139c02005 100644
--- a/chain/client-primitives/src/debug.rs
+++ b/chain/client-primitives/src/debug.rs
@@ -6,6 +6,7 @@ use std::collections::HashMap;
use crate::types::StatusError;
use actix::Message;
use chrono::DateTime;
+use near_primitives::types::EpochId;
use near_primitives::views::{
CatchupStatusView, ChainProcessingInfo, EpochValidatorInfo, RequestedStatePartsView,
SyncStatusView,
@@ -165,6 +166,8 @@ pub struct ValidatorStatus {
// Sorted by block height inversely (high to low)
// The range of heights are controlled by constants in client_actor.rs
pub production: Vec<(BlockHeight, ProductionAtHeight)>,
+ // Chunk producers that this node has banned.
+ pub banned_chunk_producers: Vec<(EpochId, Vec)>,
}
// Different debug requests that can be sent by HTML pages, via GET.
diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs
index 3ec8fb736d7..e929513e89a 100644
--- a/chain/client/src/client.rs
+++ b/chain/client/src/client.rs
@@ -62,6 +62,7 @@ use near_primitives::views::{CatchupStatusView, DroppedReason};
const NUM_REBROADCAST_BLOCKS: usize = 30;
const CHUNK_HEADERS_FOR_INCLUSION_CACHE_SIZE: usize = 2048;
+const NUM_EPOCH_CHUNK_PRODUCERS_TO_KEEP_IN_BLOCKLIST: usize = 1000;
/// The time we wait for the response to a Epoch Sync request before retrying
// TODO #3488 set 30_000
@@ -81,6 +82,10 @@ pub struct Client {
pub adv_produce_blocks: bool,
#[cfg(feature = "test_features")]
pub adv_produce_blocks_only_valid: bool,
+ #[cfg(feature = "test_features")]
+ pub produce_invalid_chunks: bool,
+ #[cfg(feature = "test_features")]
+ pub produce_invalid_tx_in_chunks: bool,
/// Fast Forward accrued delta height used to calculate fast forwarded timestamps for each block.
#[cfg(feature = "sandbox")]
@@ -94,8 +99,11 @@ pub struct Client {
pub shards_mgr: ShardsManager,
me: Option,
pub sharded_tx_pool: ShardedTransactionPool,
- prev_block_to_chunk_headers_ready_for_inclusion:
- LruCache)>>,
+ prev_block_to_chunk_headers_ready_for_inclusion: LruCache<
+ CryptoHash,
+ HashMap, AccountId)>,
+ >,
+ pub do_not_include_chunks_from: LruCache<(EpochId, AccountId), ()>,
/// Network adapter.
network_adapter: Arc,
/// Signer for block producer (if present).
@@ -237,6 +245,10 @@ impl Client {
adv_produce_blocks: false,
#[cfg(feature = "test_features")]
adv_produce_blocks_only_valid: false,
+ #[cfg(feature = "test_features")]
+ produce_invalid_chunks: false,
+ #[cfg(feature = "test_features")]
+ produce_invalid_tx_in_chunks: false,
#[cfg(feature = "sandbox")]
accrued_fastforward_delta: 0,
config,
@@ -250,6 +262,9 @@ impl Client {
prev_block_to_chunk_headers_ready_for_inclusion: LruCache::new(
CHUNK_HEADERS_FOR_INCLUSION_CACHE_SIZE,
),
+ do_not_include_chunks_from: LruCache::new(
+ NUM_EPOCH_CHUNK_PRODUCERS_TO_KEEP_IN_BLOCKLIST,
+ ),
network_adapter,
validator_signer,
pending_approvals: lru::LruCache::new(num_block_producer_seats),
@@ -408,19 +423,49 @@ impl Client {
pub fn get_chunk_headers_ready_for_inclusion(
&self,
+ epoch_id: &EpochId,
prev_block_hash: &CryptoHash,
- ) -> HashMap)> {
+ ) -> HashMap, AccountId)> {
self.prev_block_to_chunk_headers_ready_for_inclusion
.peek(prev_block_hash)
.cloned()
.unwrap_or_default()
+ .into_iter()
+ .filter(|(_, (chunk_header, _, chunk_producer))| {
+ let banned = self
+ .do_not_include_chunks_from
+ .contains(&(epoch_id.clone(), chunk_producer.clone()));
+ if banned {
+ warn!(
+ target: "client",
+ "Not including chunk {:?} from banned validator {}",
+ chunk_header.chunk_hash(),
+ chunk_producer);
+ metrics::CHUNK_DROPPED_BECAUSE_OF_BANNED_CHUNK_PRODUCER.inc();
+ }
+ !banned
+ })
+ .collect()
}
- pub fn get_num_chunks_ready_for_inclusion(&self, prev_block_hash: &CryptoHash) -> usize {
- self.prev_block_to_chunk_headers_ready_for_inclusion
- .peek(prev_block_hash)
- .map(|x| x.len())
- .unwrap_or(0)
+ pub fn num_chunk_headers_ready_for_inclusion(
+ &self,
+ epoch_id: &EpochId,
+ prev_block_hash: &CryptoHash,
+ ) -> usize {
+ let entries =
+ match self.prev_block_to_chunk_headers_ready_for_inclusion.peek(prev_block_hash) {
+ Some(entries) => entries,
+ None => return 0,
+ };
+ entries
+ .values()
+ .filter(|(_, _, chunk_producer)| {
+ !self
+ .do_not_include_chunks_from
+ .contains(&(epoch_id.clone(), chunk_producer.clone()))
+ })
+ .count()
}
/// Produce block if we are block producer for given `next_height` block height.
@@ -485,7 +530,7 @@ impl Client {
}
}
- let new_chunks = self.get_chunk_headers_ready_for_inclusion(&prev_hash);
+ let new_chunks = self.get_chunk_headers_ready_for_inclusion(&epoch_id, &prev_hash);
debug!(target: "client", "{:?} Producing block at height {}, parent {} @ {}, {} new chunks", validator_signer.validator_id(),
next_height, prev.height(), format_hash(head.last_block_hash), new_chunks.len());
@@ -577,7 +622,7 @@ impl Client {
);
// Collect new chunks.
- for (shard_id, (mut chunk_header, _)) in new_chunks {
+ for (shard_id, (mut chunk_header, _, _)) in new_chunks {
*chunk_header.height_included_mut() = next_height;
chunks[shard_id as usize] = chunk_header;
}
@@ -700,6 +745,13 @@ impl Client {
let prev_block_header = self.chain.get_block_header(&prev_block_hash)?;
let transactions = self.prepare_transactions(shard_id, &chunk_extra, &prev_block_header)?;
+ let transactions = transactions;
+ #[cfg(feature = "test_features")]
+ let transactions = Self::maybe_insert_invalid_transaction(
+ transactions,
+ prev_block_hash,
+ self.produce_invalid_tx_in_chunks,
+ );
let num_filtered_transactions = transactions.len();
let (tx_root, _) = merklize(&transactions);
let outgoing_receipts = self.chain.get_outgoing_receipts_for_shard(
@@ -726,13 +778,16 @@ impl Client {
let (outgoing_receipts_root, _) = merklize(&outgoing_receipts_hashes);
let protocol_version = self.runtime_adapter.get_epoch_protocol_version(epoch_id)?;
+ let gas_used = chunk_extra.gas_used();
+ #[cfg(feature = "test_features")]
+ let gas_used = if self.produce_invalid_chunks { gas_used + 1 } else { gas_used };
let (encoded_chunk, merkle_paths) = ShardsManager::create_encoded_shard_chunk(
prev_block_hash,
*chunk_extra.state_root(),
*chunk_extra.outcome_root(),
next_height,
shard_id,
- chunk_extra.gas_used(),
+ gas_used,
chunk_extra.gas_limit(),
chunk_extra.balance_burnt(),
chunk_extra.validator_proposals().collect(),
@@ -768,6 +823,27 @@ impl Client {
Ok(Some((encoded_chunk, merkle_paths, outgoing_receipts)))
}
+ #[cfg(feature = "test_features")]
+ fn maybe_insert_invalid_transaction(
+ mut txs: Vec,
+ prev_block_hash: CryptoHash,
+ insert: bool,
+ ) -> Vec {
+ if insert {
+ txs.push(SignedTransaction::new(
+ near_crypto::Signature::empty(near_crypto::KeyType::ED25519),
+ near_primitives::transaction::Transaction::new(
+ "test".parse().unwrap(),
+ near_crypto::PublicKey::empty(near_crypto::KeyType::SECP256K1),
+ "other".parse().unwrap(),
+ 3,
+ prev_block_hash,
+ ),
+ ));
+ }
+ txs
+ }
+
/// Prepares an ordered list of valid transactions from the pool up the limits.
fn prepare_transactions(
&mut self,
@@ -1097,8 +1173,12 @@ impl Client {
&mut self,
block_processing_artifacts: BlockProcessingArtifact,
) {
- let BlockProcessingArtifact { orphans_missing_chunks, blocks_missing_chunks, challenges } =
- block_processing_artifacts;
+ let BlockProcessingArtifact {
+ orphans_missing_chunks,
+ blocks_missing_chunks,
+ challenges,
+ invalid_chunks,
+ } = block_processing_artifacts;
// Send out challenges that accumulated via on_challenge.
self.send_challenges(challenges);
// For any missing chunk, call the ShardsManager with it so that it may apply forwarded parts.
@@ -1117,6 +1197,34 @@ impl Client {
}
// Request any (still) missing chunks.
self.request_missing_chunks(blocks_missing_chunks, orphans_missing_chunks);
+
+ for chunk_header in invalid_chunks {
+ if let Err(err) = self.ban_chunk_producer_for_producing_invalid_chunk(chunk_header) {
+ error!(target: "client", "Failed to ban chunk producer for producing invalid chunk: {:?}", err);
+ }
+ }
+ }
+
+ fn ban_chunk_producer_for_producing_invalid_chunk(
+ &mut self,
+ chunk_header: ShardChunkHeader,
+ ) -> Result<(), Error> {
+ let epoch_id =
+ self.runtime_adapter.get_epoch_id_from_prev_block(chunk_header.prev_block_hash())?;
+ let chunk_producer = self.runtime_adapter.get_chunk_producer(
+ &epoch_id,
+ chunk_header.height_created(),
+ chunk_header.shard_id(),
+ )?;
+ error!(
+ target: "client",
+ "Banning chunk producer {} for epoch {:?} for producing invalid chunk {:?}",
+ chunk_producer,
+ epoch_id,
+ chunk_header.chunk_hash());
+ metrics::CHUNK_PRODUCER_BANNED_FOR_EPOCH.inc();
+ self.do_not_include_chunks_from.put((epoch_id, chunk_producer), ());
+ Ok(())
}
fn rebroadcast_block(&mut self, block: &Block) {
@@ -1158,14 +1266,18 @@ impl Client {
}
}
- pub fn on_chunk_header_ready_for_inclusion(&mut self, chunk_header: ShardChunkHeader) {
+ pub fn on_chunk_header_ready_for_inclusion(
+ &mut self,
+ chunk_header: ShardChunkHeader,
+ chunk_producer: AccountId,
+ ) {
let prev_block_hash = chunk_header.prev_block_hash();
self.prev_block_to_chunk_headers_ready_for_inclusion
.get_or_insert(prev_block_hash.clone(), || HashMap::new());
self.prev_block_to_chunk_headers_ready_for_inclusion
.get_mut(prev_block_hash)
.unwrap()
- .insert(chunk_header.shard_id(), (chunk_header, chrono::Utc::now()));
+ .insert(chunk_header.shard_id(), (chunk_header, chrono::Utc::now(), chunk_producer));
}
pub fn sync_block_headers(
@@ -1468,7 +1580,10 @@ impl Client {
self.runtime_adapter.as_ref(),
)?;
persist_chunk(partial_chunk.clone(), Some(shard_chunk), self.chain.mut_store())?;
- self.on_chunk_header_ready_for_inclusion(encoded_chunk.cloned_header());
+ self.on_chunk_header_ready_for_inclusion(
+ encoded_chunk.cloned_header(),
+ self.me.clone().unwrap(),
+ );
self.shards_mgr.distribute_encoded_chunk(
partial_chunk,
encoded_chunk,
diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs
index d9d87a8eb2f..e3c1aa55693 100644
--- a/chain/client/src/client_actor.rs
+++ b/chain/client/src/client_actor.rs
@@ -1152,8 +1152,9 @@ impl ClientActor {
self.client.runtime_adapter.get_block_producer(&epoch_id, height)?;
if me == next_block_producer_account {
- let num_chunks =
- self.client.get_num_chunks_ready_for_inclusion(&head.last_block_hash);
+ let num_chunks = self
+ .client
+ .num_chunk_headers_ready_for_inclusion(&epoch_id, &head.last_block_hash);
let have_all_chunks = head.height == 0
|| num_chunks as u64
== self.client.runtime_adapter.num_shards(&epoch_id).unwrap();
@@ -2042,8 +2043,11 @@ impl Handler> for ClientActor {
ShardsManagerResponse::InvalidChunk(encoded_chunk) => {
self.client.on_invalid_chunk(encoded_chunk);
}
- ShardsManagerResponse::ChunkHeaderReadyForInclusion(chunk_header) => {
- self.client.on_chunk_header_ready_for_inclusion(chunk_header);
+ ShardsManagerResponse::ChunkHeaderReadyForInclusion {
+ chunk_header,
+ chunk_producer,
+ } => {
+ self.client.on_chunk_header_ready_for_inclusion(chunk_header, chunk_producer);
}
}
}
diff --git a/chain/client/src/debug.rs b/chain/client/src/debug.rs
index f338d64db04..dbd06d25ad6 100644
--- a/chain/client/src/debug.rs
+++ b/chain/client/src/debug.rs
@@ -3,6 +3,7 @@
use crate::ClientActor;
use actix::{Context, Handler};
use borsh::BorshSerialize;
+use itertools::Itertools;
use near_chain::crypto_hash_timer::CryptoHashTimer;
use near_chain::{near_chain_primitives, ChainStoreAccess, RuntimeAdapter};
use near_client_primitives::debug::{
@@ -113,19 +114,14 @@ impl BlockProductionTracker {
block_height: BlockHeight,
epoch_id: &EpochId,
num_shards: ShardId,
- new_chunks: &HashMap)>,
+ new_chunks: &HashMap, AccountId)>,
runtime_adapter: &dyn RuntimeAdapter,
) -> Result, Error> {
let mut chunk_collection_info = vec![];
for shard_id in 0..num_shards {
- if let Some((new_chunk, chunk_time)) = new_chunks.get(&shard_id) {
- let chunk_producer = runtime_adapter.get_chunk_producer(
- epoch_id,
- new_chunk.height_created(),
- shard_id,
- )?;
+ if let Some((_, chunk_time, chunk_producer)) = new_chunks.get(&shard_id) {
chunk_collection_info.push(ChunkCollection {
- chunk_producer,
+ chunk_producer: chunk_producer.clone(),
received_time: Some(chunk_time.clone()),
chunk_included: true,
});
@@ -618,6 +614,16 @@ impl ClientActor {
shards: self.client.runtime_adapter.num_shards(&head.epoch_id).unwrap_or_default(),
approval_history: self.client.doomslug.get_approval_history(),
production: productions,
+ banned_chunk_producers: self
+ .client
+ .do_not_include_chunks_from
+ .iter()
+ .map(|(k, _)| k.clone())
+ .sorted()
+ .group_by(|(k, _)| k.clone())
+ .into_iter()
+ .map(|(k, vs)| (k, vs.map(|(_, v)| v).collect()))
+ .collect(),
})
}
}
diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs
index 9949033e2c0..09231d21d60 100644
--- a/chain/client/src/metrics.rs
+++ b/chain/client/src/metrics.rs
@@ -151,6 +151,24 @@ pub(crate) static CHUNK_SKIPPED_TOTAL: Lazy = Lazy::new(|| {
.unwrap()
});
+pub(crate) static CHUNK_PRODUCER_BANNED_FOR_EPOCH: Lazy = Lazy::new(|| {
+ try_create_int_counter(
+ "near_chunk_producer_banned_for_epoch",
+ "Number of times we have banned a chunk producer for an epoch",
+ )
+ .unwrap()
+});
+
+pub(crate) static CHUNK_DROPPED_BECAUSE_OF_BANNED_CHUNK_PRODUCER: Lazy =
+ Lazy::new(|| {
+ try_create_int_counter(
+ "near_chunk_dropped_because_of_banned_chunk_producer",
+ "Number of chunks we, as a block producer,
+ dropped, because the chunk is produced by a banned chunk producer",
+ )
+ .unwrap()
+ });
+
pub(crate) static PARTIAL_ENCODED_CHUNK_RESPONSE_DELAY: Lazy = Lazy::new(|| {
try_create_histogram(
"near_partial_encoded_chunk_response_delay",
diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs
index ea2c56c7b29..814e97ef9c0 100644
--- a/chain/client/src/test_utils.rs
+++ b/chain/client/src/test_utils.rs
@@ -11,7 +11,7 @@ use futures::{future, FutureExt};
use num_rational::Ratio;
use once_cell::sync::OnceCell;
use rand::{thread_rng, Rng};
-use tracing::info;
+use tracing::{info, warn};
use crate::{start_view_client, Client, ClientActor, SyncStatus, ViewClientActor};
use near_chain::chain::{do_apply_chunks, BlockCatchUpRequest, StateSplitRequest};
@@ -173,8 +173,7 @@ impl Client {
/// has started.
pub fn finish_block_in_processing(&mut self, hash: &CryptoHash) -> Vec {
if let Ok(()) = wait_for_block_in_processing(&mut self.chain, hash) {
- let (accepted_blocks, errors) = self.postprocess_ready_blocks(Arc::new(|_| {}), true);
- assert!(errors.is_empty());
+ let (accepted_blocks, _) = self.postprocess_ready_blocks(Arc::new(|_| {}), true);
return accepted_blocks;
}
vec![]
@@ -1429,7 +1428,12 @@ impl TestEnv {
{
let target_id = self.account_to_client_index[&target.account_id.unwrap()];
let response = self.get_partial_encoded_chunk_response(target_id, request);
- self.clients[id].shards_mgr.process_partial_encoded_chunk_response(response).unwrap();
+ if let Some(response) = response {
+ self.clients[id]
+ .shards_mgr
+ .process_partial_encoded_chunk_response(response)
+ .unwrap();
+ }
} else {
panic!("The request is not a PartialEncodedChunk request {:?}", request);
}
@@ -1439,24 +1443,33 @@ impl TestEnv {
&mut self,
id: usize,
request: PartialEncodedChunkRequestMsg,
- ) -> PartialEncodedChunkResponseMsg {
+ ) -> Option {
let client = &mut self.clients[id];
- client.shards_mgr.process_partial_encoded_chunk_request(request, CryptoHash::default());
- let response = self.network_adapters[id].pop_most_recent().unwrap();
- if let PeerManagerMessageRequest::NetworkRequests(
- NetworkRequests::PartialEncodedChunkResponse { route_back: _, response },
- ) = response
+ if client
+ .shards_mgr
+ .process_partial_encoded_chunk_request(request.clone(), CryptoHash::default())
{
- return response;
+ let response = self.network_adapters[id].pop_most_recent().unwrap();
+ if let PeerManagerMessageRequest::NetworkRequests(
+ NetworkRequests::PartialEncodedChunkResponse { route_back: _, response },
+ ) = response
+ {
+ Some(response)
+ } else {
+ panic!(
+ "did not find PartialEncodedChunkResponse from the network queue {:?}",
+ response
+ );
+ }
} else {
- panic!(
- "did not find PartialEncodedChunkResponse from the network queue {:?}",
- response
- );
+ // TODO: Somehow this may fail at epoch boundaries. Figure out why.
+ warn!("Failed to process PartialEncodedChunkRequest from client {}: {:?}", id, request);
+ None
}
}
- pub fn process_shards_manager_responses(&mut self, id: usize) {
+ pub fn process_shards_manager_responses(&mut self, id: usize) -> bool {
+ let mut any_processed = false;
while let Some(msg) = self.client_adapters[id].pop() {
match msg {
ShardsManagerResponse::ChunkCompleted { partial_chunk, shard_chunk } => {
@@ -1469,11 +1482,17 @@ impl TestEnv {
ShardsManagerResponse::InvalidChunk(encoded_chunk) => {
self.clients[id].on_invalid_chunk(encoded_chunk);
}
- ShardsManagerResponse::ChunkHeaderReadyForInclusion(header) => {
- self.clients[id].on_chunk_header_ready_for_inclusion(header);
+ ShardsManagerResponse::ChunkHeaderReadyForInclusion {
+ chunk_header,
+ chunk_producer,
+ } => {
+ self.clients[id]
+ .on_chunk_header_ready_for_inclusion(chunk_header, chunk_producer);
}
}
+ any_processed = true;
}
+ any_processed
}
pub fn process_shards_manager_responses_and_finish_processing_blocks(&mut self, idx: usize) {
diff --git a/chain/jsonrpc/res/validator.html b/chain/jsonrpc/res/validator.html
index 52bd654d406..770ab855565 100644
--- a/chain/jsonrpc/res/validator.html
+++ b/chain/jsonrpc/res/validator.html
@@ -346,7 +346,7 @@
chunk_cell.append("F + " + time_delta + "ms
")
if (thresholdApprovalTime != null) {
let time_since_threshold = Date.parse(chunk_collection_time) - thresholdApprovalTime;
- let text = $("T + " + time_since_threshold+ "ms
");
+ let text = $("T + " + time_since_threshold + "ms
");
if (time_since_threshold > 300) {
text.addClass('chunk-delay-red')
} else if (time_since_threshold > 150) {
@@ -389,6 +389,12 @@
$('.js-tbody-production').append($("HEAD |
"));
}
+ for (let [epoch_id, chunk_producers] of data.status_response.ValidatorStatus.banned_chunk_producers) {
+ $('#banned-chunk-producers').append(
+ $('')
+ .text('Banned chunk producers for epoch ' + epoch_id + ': ' + chunk_producers.join(', '))
+ );
+ }
};
$(document).ready(() => {
@@ -426,7 +432,8 @@
Shards can either be produced by this validator (marked as 'ME') or received from other
validators.
Shards have missing chunks are marked as grey.
- We also mark shards arrival time in color. Shards that are delayed by more than 150ms after T are marked as orange,
+ We also mark shards arrival time in color. Shards that are delayed by more than 150ms after T are marked as
+ orange,
and ones delayed more than 300 marked as red.
Approvals
Green field means that validators endorses the PREVIOUS block.
@@ -434,6 +441,7 @@
Other colors means different amount of skips.
+