diff --git a/Cargo.lock b/Cargo.lock index bf4c0ac7766..86a120243d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3642,6 +3642,7 @@ dependencies = [ "once_cell", "percent-encoding", "rand 0.8.5", + "rayon", "reed-solomon-erasure", "regex", "reqwest", diff --git a/chain/chain-primitives/src/error.rs b/chain/chain-primitives/src/error.rs index 14f2e9a482b..6c271a8e561 100644 --- a/chain/chain-primitives/src/error.rs +++ b/chain/chain-primitives/src/error.rs @@ -134,6 +134,8 @@ pub enum Error { /// Invalid chunk state. #[error("Invalid Chunk State")] InvalidChunkState(Box), + #[error("Invalid Chunk State Witness")] + InvalidChunkStateWitness, /// Invalid chunk mask #[error("Invalid Chunk Mask")] InvalidChunkMask, @@ -194,6 +196,11 @@ pub enum Error { /// Someone is not a validator. Usually happens in signature verification #[error("Not A Validator")] NotAValidator, + /// Someone is not a chunk validator. Happens if we're asked to validate a chunk we're not + /// supposed to validate, or to verify a chunk approval signed by a validator that isn't + /// supposed to validate the chunk. + #[error("Not A Chunk Validator")] + NotAChunkValidator, /// Validator error. #[error("Validator Error: {0}")] ValidatorError(String), @@ -263,6 +270,7 @@ impl Error { | Error::InvalidChunk | Error::InvalidChunkProofs(_) | Error::InvalidChunkState(_) + | Error::InvalidChunkStateWitness | Error::InvalidChunkMask | Error::InvalidStateRoot | Error::InvalidTxRoot @@ -294,6 +302,7 @@ impl Error { | Error::InvalidBlockMerkleRoot | Error::InvalidProtocolVersion | Error::NotAValidator + | Error::NotAChunkValidator | Error::InvalidChallengeRoot => true, } } @@ -334,6 +343,7 @@ impl Error { Error::InvalidChunk => "invalid_chunk", Error::InvalidChunkProofs(_) => "invalid_chunk_proofs", Error::InvalidChunkState(_) => "invalid_chunk_state", + Error::InvalidChunkStateWitness => "invalid_chunk_state_witness", Error::InvalidChunkMask => "invalid_chunk_mask", Error::InvalidStateRoot => "invalid_state_root", Error::InvalidTxRoot => "invalid_tx_root", @@ -365,6 +375,7 @@ impl Error { Error::InvalidBlockMerkleRoot => "invalid_block_merkele_root", Error::InvalidProtocolVersion => "invalid_protocol_version", Error::NotAValidator => "not_a_validator", + Error::NotAChunkValidator => "not_a_chunk_validator", Error::InvalidChallengeRoot => "invalid_challenge_root", } } diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index c711004e95e..ddf23a282d8 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -34,6 +34,7 @@ use near_primitives::types::{ AccountId, ApprovalStake, Balance, BlockHeight, EpochHeight, EpochId, Gas, Nonce, NumShards, ShardId, StateChangesForSplitStates, StateRoot, StateRootNode, ValidatorInfoIdentifier, }; +use near_primitives::validator_mandates::AssignmentWeight; use near_primitives::version::{ProtocolVersion, PROTOCOL_VERSION}; use near_primitives::views::{ AccessKeyInfoView, AccessKeyList, CallResult, ContractCodeView, EpochValidatorInfo, @@ -705,6 +706,15 @@ impl EpochManagerAdapter for MockEpochManager { Ok(chunk_producers[index].account_id().clone()) } + fn get_chunk_validators( + &self, + _epoch_id: &EpochId, + _shard_id: ShardId, + _height: BlockHeight, + ) -> Result, EpochError> { + Ok(HashMap::new()) + } + fn get_validator_by_account_id( &self, epoch_id: &EpochId, diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index afb5fdd7bfc..880b6ff6f75 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -27,6 +27,7 @@ num-rational.workspace = true once_cell.workspace = true percent-encoding.workspace = true rand.workspace = true +rayon.workspace = true reed-solomon-erasure.workspace = true regex.workspace = true reqwest.workspace = true diff --git a/chain/client/src/adapter.rs b/chain/client/src/adapter.rs index 1b2c1a4ec39..6f25925e426 100644 --- a/chain/client/src/adapter.rs +++ b/chain/client/src/adapter.rs @@ -7,6 +7,7 @@ use near_network::types::{ use near_o11y::WithSpanContextExt; use near_primitives::block::{Approval, Block, BlockHeader}; use near_primitives::challenge::Challenge; +use near_primitives::chunk_validation::ChunkStateWitness; use near_primitives::errors::InvalidTxError; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; @@ -135,6 +136,10 @@ pub enum ProcessTxResponse { DoesNotTrackShard, } +#[derive(actix::Message, Debug, PartialEq, Eq)] +#[rtype(result = "()")] +pub struct ChunkStateWitnessMessage(pub ChunkStateWitness); + pub struct Adapter { /// Address of the client actor. client_addr: actix::Addr, @@ -333,4 +338,11 @@ impl near_network::client::Client for Adapter { } } } + + async fn chunk_state_witness(&self, witness: ChunkStateWitness) { + match self.client_addr.send(ChunkStateWitnessMessage(witness).with_span_context()).await { + Ok(()) => {} + Err(err) => tracing::error!("mailbox error: {err}"), + } + } } diff --git a/chain/client/src/chunk_validation.rs b/chain/client/src/chunk_validation.rs new file mode 100644 index 00000000000..010820095ca --- /dev/null +++ b/chain/client/src/chunk_validation.rs @@ -0,0 +1,154 @@ +use std::sync::Arc; + +use near_async::messaging::{CanSend, Sender}; +use near_chain::types::RuntimeAdapter; +use near_chain_primitives::Error; +use near_epoch_manager::EpochManagerAdapter; +use near_network::types::{NetworkRequests, PeerManagerMessageRequest}; +use near_primitives::checked_feature; +use near_primitives::chunk_validation::{ + ChunkEndorsement, ChunkEndorsementInner, ChunkEndorsementMessage, ChunkStateWitness, +}; +use near_primitives::sharding::ShardChunkHeader; +use near_primitives::types::EpochId; +use near_primitives::validator_signer::ValidatorSigner; + +use crate::Client; + +/// A module that handles chunk validation logic. Chunk validation refers to a +/// critical process of stateless validation, where chunk validators (certain +/// validators selected to validate the chunk) verify that the chunk's state +/// witness is correct, and then send chunk endorsements to the block producer +/// so that the chunk can be included in the block. +pub struct ChunkValidator { + /// The signer for our own node, if we are a validator. If not, this is None. + my_signer: Option>, + epoch_manager: Arc, + network_sender: Sender, + runtime_adapter: Arc, +} + +impl ChunkValidator { + pub fn new( + my_signer: Option>, + epoch_manager: Arc, + network_sender: Sender, + runtime_adapter: Arc, + ) -> Self { + Self { my_signer, epoch_manager, network_sender, runtime_adapter } + } + + /// Performs the chunk validation logic. When done, it will send the chunk + /// endorsement message to the block producer. The actual validation logic + /// happens in a separate thread. + pub fn start_validating_chunk(&self, state_witness: ChunkStateWitness) -> Result<(), Error> { + let Some(my_signer) = self.my_signer.as_ref() else { + return Err(Error::NotAValidator); + }; + let chunk_header = state_witness.chunk_header.clone(); + let epoch_id = + self.epoch_manager.get_epoch_id_from_prev_block(chunk_header.prev_block_hash())?; + // We will only validate something if we are a chunk validator for this chunk. + // Note this also covers the case before the protocol upgrade for chunk validators, + // because the chunk validators will be empty. + let chunk_validators = self.epoch_manager.get_chunk_validators( + &epoch_id, + chunk_header.shard_id(), + chunk_header.height_created(), + )?; + if !chunk_validators.contains_key(my_signer.validator_id()) { + return Err(Error::NotAChunkValidator); + } + let block_producer = + self.epoch_manager.get_block_producer(&epoch_id, chunk_header.height_created())?; + + let network_sender = self.network_sender.clone(); + let signer = self.my_signer.clone().unwrap(); + let runtime_adapter = self.runtime_adapter.clone(); + rayon::spawn(move || match validate_chunk(&state_witness, runtime_adapter.as_ref()) { + Ok(()) => { + tracing::debug!( + target: "chunk_validation", + chunk_hash=?chunk_header.chunk_hash(), + block_producer=%block_producer, + "Chunk validated successfully, sending endorsement", + ); + let endorsement_to_sign = ChunkEndorsementInner::new(chunk_header.chunk_hash()); + network_sender.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ChunkEndorsement(ChunkEndorsementMessage { + endorsement: ChunkEndorsement { + account_id: signer.validator_id().clone(), + signature: signer.sign_chunk_endorsement(&endorsement_to_sign), + inner: endorsement_to_sign, + }, + target: block_producer, + }), + )); + } + Err(err) => { + tracing::error!("Failed to validate chunk: {:?}", err); + } + }); + Ok(()) + } +} + +/// The actual chunk validation logic. +fn validate_chunk( + state_witness: &ChunkStateWitness, + runtime_adapter: &dyn RuntimeAdapter, +) -> Result<(), Error> { + // TODO: Replace this with actual stateless validation logic. + if state_witness.state_root != state_witness.chunk_header.prev_state_root() { + return Err(Error::InvalidChunkStateWitness); + } + // We'll need the runtime no matter what so just leaving it here to avoid an + // unused variable warning. + runtime_adapter.get_tries(); + Ok(()) +} + +impl Client { + /// Responds to a network request to verify a `ChunkStateWitness`, which is + /// sent by chunk producers after they produce a chunk. + pub fn process_chunk_state_witness(&mut self, witness: ChunkStateWitness) -> Result<(), Error> { + // TODO(#10265): We'll need to fetch some data from the chain; at the very least we need + // the previous block to exist, and we need the previous chunks' receipt roots. + // Some of this depends on delayed chunk execution. Also, if the previous block + // does not exist, we should queue this (similar to orphans) to retry later. + // For now though, we just pass it to the chunk validation logic. + self.chunk_validator.start_validating_chunk(witness) + } + + /// Distributes the chunk state witness to chunk validators that are + /// selected to validate this chunk. + pub fn send_chunk_state_witness_to_chunk_validators( + &mut self, + epoch_id: &EpochId, + chunk_header: &ShardChunkHeader, + ) -> Result<(), Error> { + let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?; + if !checked_feature!("stable", ChunkValidation, protocol_version) { + return Ok(()); + } + let chunk_validators = self.epoch_manager.get_chunk_validators( + epoch_id, + chunk_header.shard_id(), + chunk_header.height_created(), + )?; + let witness = ChunkStateWitness { + chunk_header: chunk_header.clone(), + state_root: chunk_header.prev_state_root(), + }; + tracing::debug!( + target: "chunk_validation", + "Sending chunk state witness for chunk {:?} to chunk validators {:?}", + chunk_header.chunk_hash(), + chunk_validators.keys(), + ); + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ChunkStateWitness(chunk_validators.into_keys().collect(), witness), + )); + Ok(()) + } +} diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index e9b5404f554..aa9dbfe2bcc 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2,6 +2,7 @@ //! This client works completely synchronously and must be operated by some async actor outside. use crate::adapter::ProcessTxResponse; +use crate::chunk_validation::ChunkValidator; use crate::debug::BlockProductionTracker; use crate::debug::PRODUCTION_TIMES_CACHE_SIZE; use crate::sync::adapter::SyncShardInfo; @@ -17,6 +18,7 @@ use chrono::DateTime; use chrono::Utc; use itertools::Itertools; use lru::LruCache; +use near_async::messaging::IntoSender; use near_async::messaging::{CanSend, Sender}; use near_chain::chain::VerifyBlockHashAndSignatureResult; use near_chain::chain::{ @@ -141,7 +143,7 @@ pub struct Client { >, pub do_not_include_chunks_from: LruCache<(EpochId, AccountId), ()>, /// Network adapter. - network_adapter: PeerManagerAdapter, + pub network_adapter: PeerManagerAdapter, /// Signer for block producer (if present). pub validator_signer: Option>, /// Approvals for which we do not have the block yet @@ -184,6 +186,8 @@ pub struct Client { /// When the "sync block" was requested. /// The "sync block" is the last block of the previous epoch, i.e. `prev_hash` of the `sync_hash` block. pub last_time_sync_block_requested: Option>, + + pub chunk_validator: ChunkValidator, } impl Client { @@ -328,6 +332,12 @@ impl Client { validator_signer.clone(), doomslug_threshold_mode, ); + let chunk_validator = ChunkValidator::new( + validator_signer.clone(), + epoch_manager.clone(), + network_adapter.clone().into_sender(), + runtime_adapter.clone(), + ); Ok(Self { #[cfg(feature = "test_features")] adv_produce_blocks: None, @@ -370,6 +380,7 @@ impl Client { tier1_accounts_cache: None, flat_storage_creator, last_time_sync_block_requested: None, + chunk_validator, }) } @@ -1732,6 +1743,12 @@ impl Client { let last_header = Chain::get_prev_chunk_header(epoch_manager, block, shard_id).unwrap(); match self.produce_chunk(*block.hash(), &epoch_id, last_header, next_height, shard_id) { Ok(Some((encoded_chunk, merkle_paths, receipts))) => { + if let Err(err) = self.send_chunk_state_witness_to_chunk_validators( + &epoch_id, + &encoded_chunk.cloned_header(), + ) { + tracing::error!(target: "client", ?err, "Failed to send chunk state witness to chunk validators"); + } self.persist_and_distribute_encoded_chunk( encoded_chunk, merkle_paths, diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index b7f0a8c1b60..264731e3fb2 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -6,8 +6,8 @@ //! https://github.com/near/nearcore/issues/7899 use crate::adapter::{ - BlockApproval, BlockHeadersResponse, BlockResponse, ProcessTxRequest, ProcessTxResponse, - RecvChallenge, SetNetworkInfo, StateResponse, + BlockApproval, BlockHeadersResponse, BlockResponse, ChunkStateWitnessMessage, ProcessTxRequest, + ProcessTxResponse, RecvChallenge, SetNetworkInfo, StateResponse, }; #[cfg(feature = "test_features")] use crate::client::AdvProduceBlocksMode; @@ -1978,6 +1978,22 @@ impl Handler> for ClientActor { } } +impl Handler> for ClientActor { + type Result = (); + + #[perf] + fn handle( + &mut self, + msg: WithSpanContext, + _: &mut Context, + ) -> Self::Result { + let (_span, msg) = handler_debug_span!(target: "client", msg); + if let Err(err) = self.client.process_chunk_state_witness(msg.0) { + tracing::error!(target: "client", ?err, "Error processing chunk state witness"); + } + } +} + /// Returns random seed sampled from the current thread pub fn random_seed_from_thread() -> RngSeed { let mut rng_seed: RngSeed = [0; 32]; diff --git a/chain/client/src/lib.rs b/chain/client/src/lib.rs index da7018fc397..c826e6bafb4 100644 --- a/chain/client/src/lib.rs +++ b/chain/client/src/lib.rs @@ -22,6 +22,7 @@ pub use near_client_primitives::debug::DebugStatus; pub mod adapter; pub mod adversarial; +mod chunk_validation; mod client; mod client_actor; mod config_updater; diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 1da634d156d..9c43596f4ef 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -834,6 +834,12 @@ pub fn setup_mock_all_validators( | NetworkRequests::TxStatus(_, _, _) | NetworkRequests::SnapshotHostInfo { .. } | NetworkRequests::Challenge(_) => {} + NetworkRequests::ChunkStateWitness(_, _) => { + // TODO(#10265): Implement for integration tests. + }, + NetworkRequests::ChunkEndorsement(_) => { + // TODO(#10265): Implement for integration tests. + }, }; } resp diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 522d6a06cb3..56c66697e67 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -18,6 +18,7 @@ use near_network::types::{PartialEncodedChunkRequestMsg, PartialEncodedChunkResp use near_o11y::testonly::TracingCapture; use near_primitives::action::delegate::{DelegateAction, NonDelegateAction, SignedDelegateAction}; use near_primitives::block::Block; +use near_primitives::chunk_validation::ChunkEndorsementMessage; use near_primitives::epoch_manager::RngSeed; use near_primitives::errors::InvalidTxError; use near_primitives::hash::CryptoHash; @@ -46,7 +47,7 @@ pub struct TestEnv { pub client_adapters: Vec>, pub shards_manager_adapters: Vec, pub clients: Vec, - pub(crate) account_to_client_index: HashMap, + pub(crate) account_indices: AccountIndices, pub(crate) paused_blocks: Arc>>>>, // random seed to be inject in each client according to AccountId // if not set, a default constant TEST_SEED will be injected @@ -110,11 +111,11 @@ impl TestEnv { } pub fn client(&mut self, account_id: &AccountId) -> &mut Client { - &mut self.clients[self.account_to_client_index[account_id]] + self.account_indices.lookup_mut(&mut self.clients, account_id) } pub fn shards_manager(&self, account: &AccountId) -> &ShardsManagerAdapterForTest { - &self.shards_manager_adapters[self.account_to_client_index[account]] + self.account_indices.lookup(&self.shards_manager_adapters, account) } pub fn process_partial_encoded_chunks(&mut self) { @@ -122,51 +123,40 @@ impl TestEnv { let mut keep_going = true; while keep_going { + keep_going = false; // for network_adapter in network_adapters.iter() { for i in 0..network_adapters.len() { let network_adapter = network_adapters.get(i).unwrap(); let _span = tracing::debug_span!(target: "test", "process_partial_encoded_chunks", client=i).entered(); - keep_going = false; - // process partial encoded chunks - while let Some(request) = network_adapter.pop() { - // if there are any requests in any of the adapters reset - // keep going to true as processing of any message may - // trigger more messages to be processed in other clients - // it's a bit sad and it would be much nicer if all messages - // were forwarded to a single queue - // TODO would be nicer to first handle all PECs and then all PECFs - keep_going = true; - match request { - PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedChunkMessage { - account_id, - partial_encoded_chunk, - }, - ) => { - let partial_encoded_chunk = - PartialEncodedChunk::from(partial_encoded_chunk); - let message = - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk( - partial_encoded_chunk, - ); - self.shards_manager(&account_id).send(message); - } - PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedChunkForward { account_id, forward }, - ) => { - let message = - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward( - forward, - ); - self.shards_manager(&account_id).send(message); - } - _ => { - tracing::debug!(target: "test", ?request, "skipping unsupported request type"); - } + keep_going |= network_adapter.handle_filtered(|request| match request { + PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedChunkMessage { + account_id, + partial_encoded_chunk, + }, + ) => { + let partial_encoded_chunk = + PartialEncodedChunk::from(partial_encoded_chunk); + let message = ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk( + partial_encoded_chunk, + ); + self.shards_manager(&account_id).send(message); + None } - } + PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedChunkForward { account_id, forward }, + ) => { + let message = + ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward( + forward, + ); + self.shards_manager(&account_id).send(message); + None + } + _ => Some(request), + }); } } } @@ -189,7 +179,7 @@ impl TestEnv { NetworkRequests::PartialEncodedChunkRequest { target, request, .. }, ) = request { - let target_id = self.account_to_client_index[&target.account_id.unwrap()]; + let target_id = self.account_indices.index(&target.account_id.unwrap()); let response = self.get_partial_encoded_chunk_response(target_id, request); if let Some(response) = response { self.shards_manager_adapters[id].send( @@ -271,6 +261,53 @@ impl TestEnv { } } + pub fn propagate_chunk_state_witnesses(&mut self) { + for idx in 0..self.clients.len() { + let _span = + tracing::debug_span!(target: "test", "propagate_chunk_state_witnesses", client=idx) + .entered(); + + self.network_adapters[idx].handle_filtered(|msg| { + if let PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ChunkStateWitness(accounts, chunk_state_witness), + ) = msg + { + for account in accounts { + self.account_indices + .lookup_mut(&mut self.clients, &account) + .process_chunk_state_witness(chunk_state_witness.clone()) + .unwrap(); + } + None + } else { + Some(msg) + } + }); + } + } + + pub fn get_all_chunk_endorsements(&mut self) -> Vec { + let mut approvals = Vec::new(); + for idx in 0..self.clients.len() { + let _span = + tracing::debug_span!(target: "test", "get_all_chunk_endorsements", client=idx) + .entered(); + + self.network_adapters[idx].handle_filtered(|msg| { + if let PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ChunkEndorsement(endorsement), + ) = msg + { + approvals.push(endorsement); + None + } else { + Some(msg) + } + }); + } + approvals + } + pub fn send_money(&mut self, id: usize) -> ProcessTxResponse { let account_id = self.get_client_id(0); let signer = @@ -528,3 +565,19 @@ impl Drop for TestEnv { } } } + +pub(crate) struct AccountIndices(pub(crate) HashMap); + +impl AccountIndices { + pub fn index(&self, account_id: &AccountId) -> usize { + self.0[account_id] + } + + pub fn lookup<'a, T>(&self, container: &'a [T], account_id: &AccountId) -> &'a T { + &container[self.0[account_id]] + } + + pub fn lookup_mut<'a, T>(&self, container: &'a mut [T], account_id: &AccountId) -> &'a mut T { + &mut container[self.0[account_id]] + } +} diff --git a/chain/client/src/test_utils/test_env_builder.rs b/chain/client/src/test_utils/test_env_builder.rs index 02dbdeee8ce..7624786d053 100644 --- a/chain/client/src/test_utils/test_env_builder.rs +++ b/chain/client/src/test_utils/test_env_builder.rs @@ -23,7 +23,7 @@ use near_store::{NodeStorage, ShardUId, Store, StoreConfig, TrieConfig}; use super::setup::{setup_client_with_runtime, setup_synchronous_shards_manager}; use super::test_env::TestEnv; -use super::TEST_SEED; +use super::{AccountIndices, TEST_SEED}; #[derive(derive_more::From, Clone)] enum EpochManagerKind { @@ -544,12 +544,13 @@ impl TestEnvBuilder { client_adapters, shards_manager_adapters, clients, - account_to_client_index: self - .clients - .into_iter() - .enumerate() - .map(|(index, client)| (client, index)) - .collect(), + account_indices: AccountIndices( + self.clients + .into_iter() + .enumerate() + .map(|(index, client)| (client, index)) + .collect(), + ), paused_blocks: Default::default(), seeds, archive: self.archive, diff --git a/chain/client/src/tests/process_blocks.rs b/chain/client/src/tests/process_blocks.rs index 9762653c005..fa6409def30 100644 --- a/chain/client/src/tests/process_blocks.rs +++ b/chain/client/src/tests/process_blocks.rs @@ -3,6 +3,7 @@ use assert_matches::assert_matches; use near_chain::{test_utils, ChainGenesis, Provenance}; use near_crypto::vrf::Value; use near_crypto::{KeyType, PublicKey, Signature}; +use near_network::types::{NetworkRequests, PeerManagerMessageRequest}; use near_primitives::block::Block; use near_primitives::network::PeerId; use near_primitives::sharding::ShardChunkHeader; @@ -41,7 +42,12 @@ fn test_not_process_height_twice() { // check that the second block is not being processed assert!(!test_utils::is_block_in_processing(&env.clients[0].chain, &dup_block_hash)); // check that we didn't rebroadcast the second block - assert!(env.network_adapters[0].pop().is_none()); + while let Some(msg) = env.network_adapters[0].pop() { + assert!(!matches!( + msg, + PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Block { .. }) + )); + } } /// Test that if a block contains chunks with invalid shard_ids, the client will return error. diff --git a/chain/epoch-manager/src/adapter.rs b/chain/epoch-manager/src/adapter.rs index a61ce5b7094..f88933ef785 100644 --- a/chain/epoch-manager/src/adapter.rs +++ b/chain/epoch-manager/src/adapter.rs @@ -18,11 +18,11 @@ use near_primitives::types::{ AccountId, ApprovalStake, Balance, BlockHeight, EpochHeight, EpochId, ShardId, ValidatorInfoIdentifier, }; +use near_primitives::validator_mandates::AssignmentWeight; use near_primitives::version::ProtocolVersion; use near_primitives::views::EpochValidatorInfo; use near_store::{ShardUId, StoreUpdate}; use std::cmp::Ordering; -#[cfg(feature = "new_epoch_sync")] use std::collections::HashMap; use std::sync::Arc; @@ -187,6 +187,14 @@ pub trait EpochManagerAdapter: Send + Sync { shard_id: ShardId, ) -> Result; + /// Gets the chunk validators for a given height and shard. + fn get_chunk_validators( + &self, + epoch_id: &EpochId, + shard_id: ShardId, + height: BlockHeight, + ) -> Result, EpochError>; + fn get_validator_by_account_id( &self, epoch_id: &EpochId, @@ -643,6 +651,16 @@ impl EpochManagerAdapter for EpochManagerHandle { Ok(epoch_manager.get_chunk_producer_info(epoch_id, height, shard_id)?.take_account_id()) } + fn get_chunk_validators( + &self, + epoch_id: &EpochId, + shard_id: ShardId, + height: BlockHeight, + ) -> Result, EpochError> { + let epoch_manager = self.read(); + epoch_manager.get_chunk_validators(epoch_id, shard_id, height) + } + fn get_validator_by_account_id( &self, epoch_id: &EpochId, diff --git a/chain/epoch-manager/src/lib.rs b/chain/epoch-manager/src/lib.rs index b2f9bb193d3..18380cb4d22 100644 --- a/chain/epoch-manager/src/lib.rs +++ b/chain/epoch-manager/src/lib.rs @@ -18,6 +18,7 @@ use near_primitives::types::{ EpochInfoProvider, NumBlocks, NumSeats, ShardId, ValidatorId, ValidatorInfoIdentifier, ValidatorKickoutReason, ValidatorStats, }; +use near_primitives::validator_mandates::AssignmentWeight; use near_primitives::version::{ProtocolVersion, UPGRADABILITY_FIX_PROTOCOL_VERSION}; use near_primitives::views::{ CurrentEpochValidatorInfo, EpochValidatorInfo, NextEpochValidatorInfo, ValidatorKickoutView, @@ -916,6 +917,30 @@ impl EpochManager { }) } + /// Returns the list of chunk validators for the given shard_id and height. + pub fn get_chunk_validators( + &self, + epoch_id: &EpochId, + shard_id: ShardId, + height: BlockHeight, + ) -> Result, EpochError> { + let epoch_info = self.get_epoch_info(epoch_id)?; + let chunk_validators_per_shard = epoch_info.sample_chunk_validators(height); + let chunk_validators = + chunk_validators_per_shard.get(shard_id as usize).ok_or_else(|| { + EpochError::ChunkValidatorSelectionError(format!( + "Invalid shard ID {} for height {}, epoch {:?} for chunk validation", + shard_id, height, epoch_id, + )) + })?; + Ok(chunk_validators + .iter() + .map(|(validator_id, seats)| { + (epoch_info.get_validator(*validator_id).take_account_id(), seats.clone()) + }) + .collect()) + } + /// get_heuristic_block_approvers_ordered: block producers for epoch /// get_all_block_producers_ordered: block producers for epoch, slashing info /// get_all_block_approvers_ordered: block producers for epoch, slashing info, sometimes block producers for next epoch diff --git a/chain/network/src/client.rs b/chain/network/src/client.rs index 05083aef5cf..887b289940b 100644 --- a/chain/network/src/client.rs +++ b/chain/network/src/client.rs @@ -4,6 +4,7 @@ use crate::types::{NetworkInfo, ReasonForBan}; use near_primitives::block::{Approval, Block, BlockHeader}; use near_primitives::challenge::Challenge; +use near_primitives::chunk_validation::ChunkStateWitness; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::transaction::SignedTransaction; @@ -62,6 +63,8 @@ pub trait Client: Send + Sync + 'static { &self, accounts: Vec<(AnnounceAccount, Option)>, ) -> Result, ReasonForBan>; + + async fn chunk_state_witness(&self, witness: ChunkStateWitness); } /// Implementation of Client which doesn't do anything and never returns errors. @@ -129,4 +132,6 @@ impl Client for Noop { ) -> Result, ReasonForBan> { Ok(vec![]) } + + async fn chunk_state_witness(&self, _witness: ChunkStateWitness) {} } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 10b4822121c..c412877bcc9 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -7,6 +7,8 @@ mod peer; mod proto_conv; mod state_sync; pub use edge::*; +use near_primitives::chunk_validation::ChunkEndorsement; +use near_primitives::chunk_validation::ChunkStateWitness; pub use peer::*; pub use state_sync::*; @@ -540,6 +542,9 @@ pub enum RoutedMessageBody { VersionedPartialEncodedChunk(PartialEncodedChunk), _UnusedVersionedStateResponse, PartialEncodedChunkForward(PartialEncodedChunkForwardMsg), + + ChunkStateWitness(ChunkStateWitness), + ChunkEndorsement(ChunkEndorsement), } impl RoutedMessageBody { @@ -548,10 +553,11 @@ impl RoutedMessageBody { // lost pub fn is_important(&self) -> bool { match self { - // Both BlockApproval and VersionedPartialEncodedChunk is essential for block production and - // are only sent by the original node and if they are lost, the receiver node doesn't - // know to request them. + // These messages are important because they are critical for block and chunk production, + // and lost messages cannot be requested again. RoutedMessageBody::BlockApproval(_) + | RoutedMessageBody::ChunkEndorsement(_) + | RoutedMessageBody::ChunkStateWitness(_) | RoutedMessageBody::VersionedPartialEncodedChunk(_) => true, _ => false, } @@ -604,6 +610,8 @@ impl fmt::Debug for RoutedMessageBody { RoutedMessageBody::Ping(_) => write!(f, "Ping"), RoutedMessageBody::Pong(_) => write!(f, "Pong"), RoutedMessageBody::_UnusedVersionedStateResponse => write!(f, "VersionedStateResponse"), + RoutedMessageBody::ChunkStateWitness(_) => write!(f, "ChunkStateWitness"), + RoutedMessageBody::ChunkEndorsement(_) => write!(f, "ChunkEndorsement"), } } } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index ffe74e1520e..f8aa249bf69 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -988,6 +988,14 @@ impl PeerActor { // variant completely. None } + RoutedMessageBody::ChunkStateWitness(witness) => { + network_state.client.chunk_state_witness(witness).await; + None + } + RoutedMessageBody::ChunkEndorsement(_) => { + // TODO(#10265): Handle chunk approvals. + None + } body => { tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body); None diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 5dae71760bc..9840e981807 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -958,6 +958,24 @@ impl PeerManagerActor { self.state.tier2.broadcast_message(Arc::new(PeerMessage::Challenge(challenge))); NetworkResponses::NoResponse } + NetworkRequests::ChunkStateWitness(chunk_validators, state_witness) => { + for chunk_validator in chunk_validators { + self.state.send_message_to_account( + &self.clock, + &chunk_validator, + RoutedMessageBody::ChunkStateWitness(state_witness.clone()), + ); + } + NetworkResponses::NoResponse + } + NetworkRequests::ChunkEndorsement(approval) => { + self.state.send_message_to_account( + &self.clock, + &approval.target, + RoutedMessageBody::ChunkEndorsement(approval.endorsement), + ); + NetworkResponses::NoResponse + } } } diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index c2ba09351cb..5610bf14e85 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -269,6 +269,29 @@ impl MockPeerManagerAdapter { pub fn put_back_most_recent(&self, request: PeerManagerMessageRequest) { self.requests.write().unwrap().push_back(request); } + /// Calls the handler for each message, but removing only those for which the handler returns + /// None (or else the returned message gets requeued; the returned message should be the same + /// as the one given). Returns true if any message was removed. + pub fn handle_filtered( + &self, + mut f: impl FnMut(PeerManagerMessageRequest) -> Option, + ) -> bool { + // We get the count first and then pop one by one, that way we avoid + // grabbing the lock for the whole duration, which might lead to a + // deadlock if the processing of the request results in more network + // messages. + let num_requests = self.requests.read().unwrap().len(); + let mut handled = false; + for _ in 0..num_requests { + let request = self.requests.write().unwrap().pop_front().unwrap(); + if let Some(request) = f(request) { + self.requests.write().unwrap().push_back(request); + } else { + handled = true; + } + } + handled + } } #[derive(actix::Message, Clone, Debug)] diff --git a/chain/network/src/testonly/fake_client.rs b/chain/network/src/testonly/fake_client.rs index 5d2b0d00c96..559912ba04d 100644 --- a/chain/network/src/testonly/fake_client.rs +++ b/chain/network/src/testonly/fake_client.rs @@ -6,6 +6,7 @@ use crate::types::{NetworkInfo, ReasonForBan, StateResponseInfoV2}; use near_async::messaging; use near_primitives::block::{Approval, Block, BlockHeader}; use near_primitives::challenge::Challenge; +use near_primitives::chunk_validation::ChunkStateWitness; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::sharding::{ChunkHash, PartialEncodedChunkPart}; @@ -25,6 +26,7 @@ pub enum Event { Challenge(Challenge), Chunk(Vec), ChunkRequest(ChunkHash), + ChunkStateWitness(ChunkStateWitness), Transaction(SignedTransaction), } @@ -117,6 +119,10 @@ impl client::Client for Fake { self.event_sink.push(Event::AnnounceAccount(accounts.clone())); Ok(accounts.into_iter().map(|a| a.0).collect()) } + + async fn chunk_state_witness(&self, witness: ChunkStateWitness) { + self.event_sink.push(Event::ChunkStateWitness(witness)); + } } impl messaging::CanSend for Fake { diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 24b8ce2a40c..eee397b2581 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -18,6 +18,7 @@ use near_async::time; use near_crypto::PublicKey; use near_primitives::block::{ApprovalMessage, Block, GenesisId}; use near_primitives::challenge::Challenge; +use near_primitives::chunk_validation::{ChunkEndorsementMessage, ChunkStateWitness}; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::sharding::PartialEncodedChunkWithArcReceipts; @@ -257,6 +258,11 @@ pub enum NetworkRequests { TxStatus(AccountId, AccountId, CryptoHash), /// A challenge to invalidate a block. Challenge(Challenge), + /// A chunk's state witness. + ChunkStateWitness(Vec, ChunkStateWitness), + /// Message for a chunk endorsement, sent by a chunk validator to + /// the block producer. + ChunkEndorsement(ChunkEndorsementMessage), } /// Combines peer address info, chain. diff --git a/core/primitives/src/chunk_validation.rs b/core/primitives/src/chunk_validation.rs new file mode 100644 index 00000000000..65e997b8561 --- /dev/null +++ b/core/primitives/src/chunk_validation.rs @@ -0,0 +1,51 @@ +use crate::sharding::{ChunkHash, ShardChunkHeader}; +use crate::types::StateRoot; +use borsh::{BorshDeserialize, BorshSerialize}; +use near_crypto::Signature; +use near_primitives_core::types::AccountId; + +/// The state witness for a chunk; proves the state transition that the +/// chunk attests to. +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct ChunkStateWitness { + // TODO(#10265): Is the entire header necessary? + pub chunk_header: ShardChunkHeader, + // TODO(#10265): Replace this with fields for the actual witness. + pub state_root: StateRoot, +} + +/// The endorsement of a chunk by a chunk validator. By providing this, a +/// chunk validator has verified that the chunk state witness is correct. +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct ChunkEndorsement { + pub inner: ChunkEndorsementInner, + pub account_id: AccountId, + pub signature: Signature, +} + +/// This is the part of the chunk endorsement that is actually being signed. +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct ChunkEndorsementInner { + pub chunk_hash: ChunkHash, + /// An arbitrary static string to make sure that this struct cannot be + /// serialized to look identical to another serialized struct. For chunk + /// production we are signing a chunk hash, so we need to make sure that + /// this signature means something different. + /// + /// This is a messy workaround until we know what to do with NEP 483. + signature_differentiator: String, +} + +impl ChunkEndorsementInner { + pub fn new(chunk_hash: ChunkHash) -> Self { + Self { chunk_hash, signature_differentiator: "ChunkEndorsement".to_owned() } + } +} + +/// Message intended for the network layer to send a chunk endorsement. +/// It just includes an additional target account ID to send it to. +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] +pub struct ChunkEndorsementMessage { + pub endorsement: ChunkEndorsement, + pub target: AccountId, +} diff --git a/core/primitives/src/errors.rs b/core/primitives/src/errors.rs index 8b32fbbf8bd..a816581240e 100644 --- a/core/primitives/src/errors.rs +++ b/core/primitives/src/errors.rs @@ -857,6 +857,8 @@ pub enum EpochError { num_validators: u64, num_shards: u64, }, + /// Error selecting validators for a chunk. + ChunkValidatorSelectionError(String), } impl std::error::Error for EpochError {} @@ -881,6 +883,9 @@ impl Display for EpochError { EpochError::NotEnoughValidators { num_shards, num_validators } => { write!(f, "There were not enough validator proposals to fill all shards. num_proposals: {}, num_shards: {}", num_validators, num_shards) } + EpochError::ChunkValidatorSelectionError(err) => { + write!(f, "Error selecting validators for a chunk: {}", err) + } } } } @@ -901,6 +906,9 @@ impl Debug for EpochError { EpochError::NotEnoughValidators { num_shards, num_validators } => { write!(f, "NotEnoughValidators({}, {})", num_validators, num_shards) } + EpochError::ChunkValidatorSelectionError(err) => { + write!(f, "ChunkValidatorSelectionError({})", err) + } } } } diff --git a/core/primitives/src/lib.rs b/core/primitives/src/lib.rs index 315173ad97e..300348904e0 100644 --- a/core/primitives/src/lib.rs +++ b/core/primitives/src/lib.rs @@ -10,6 +10,7 @@ pub mod block; pub mod block_header; pub mod chains; pub mod challenge; +pub mod chunk_validation; pub mod epoch_manager; pub mod epoch_sync; pub mod errors; diff --git a/core/primitives/src/validator_signer.rs b/core/primitives/src/validator_signer.rs index 8d082e9377b..5a9c8879d5f 100644 --- a/core/primitives/src/validator_signer.rs +++ b/core/primitives/src/validator_signer.rs @@ -5,6 +5,7 @@ use near_crypto::{InMemorySigner, KeyType, PublicKey, Signature, Signer}; use crate::block::{Approval, ApprovalInner, BlockHeader}; use crate::challenge::ChallengeBody; +use crate::chunk_validation::ChunkEndorsementInner; use crate::hash::CryptoHash; use crate::network::{AnnounceAccount, PeerId}; use crate::sharding::ChunkHash; @@ -36,6 +37,9 @@ pub trait ValidatorSigner: Sync + Send { /// Signs approval of given parent hash and reference hash. fn sign_approval(&self, inner: &ApprovalInner, target_height: BlockHeight) -> Signature; + /// Signs approval of the given chunk. + fn sign_chunk_endorsement(&self, inner: &ChunkEndorsementInner) -> Signature; + /// Signs challenge body. fn sign_challenge(&self, challenge_body: &ChallengeBody) -> (CryptoHash, Signature); @@ -108,6 +112,10 @@ impl ValidatorSigner for EmptyValidatorSigner { Signature::default() } + fn sign_chunk_endorsement(&self, _inner: &ChunkEndorsementInner) -> Signature { + Signature::default() + } + fn sign_challenge(&self, challenge_body: &ChallengeBody) -> (CryptoHash, Signature) { (CryptoHash::hash_borsh(challenge_body), Signature::default()) } @@ -199,6 +207,10 @@ impl ValidatorSigner for InMemoryValidatorSigner { self.signer.sign(&Approval::get_data_for_sig(inner, target_height)) } + fn sign_chunk_endorsement(&self, inner: &ChunkEndorsementInner) -> Signature { + self.signer.sign(&borsh::to_vec(inner).unwrap()) + } + fn sign_challenge(&self, challenge_body: &ChallengeBody) -> (CryptoHash, Signature) { let hash = CryptoHash::hash_borsh(challenge_body); let signature = self.signer.sign(hash.as_ref()); diff --git a/integration-tests/src/tests/client/challenges.rs b/integration-tests/src/tests/client/challenges.rs index 95d636287f3..9f4fe6164f6 100644 --- a/integration-tests/src/tests/client/challenges.rs +++ b/integration-tests/src/tests/client/challenges.rs @@ -183,15 +183,16 @@ fn test_verify_block_double_sign_challenge() { let result = env.clients[0].process_block_test(b2.into(), Provenance::SYNC); assert!(result.is_ok()); - let mut last_message = env.network_adapters[0].pop().unwrap().as_network_requests(); - if let NetworkRequests::Block { .. } = last_message { - last_message = env.network_adapters[0].pop().unwrap().as_network_requests(); - } - if let NetworkRequests::Challenge(network_challenge) = last_message { - assert_eq!(network_challenge, valid_challenge); - } else { - assert!(false); + + let mut seen_challenge = false; + while let Some(message) = env.network_adapters[0].pop() { + if let NetworkRequests::Challenge(network_challenge) = message.as_network_requests() { + assert_eq!(network_challenge, valid_challenge); + seen_challenge = true; + break; + } } + assert!(seen_challenge); } fn create_invalid_proofs_chunk( @@ -500,13 +501,15 @@ fn test_verify_chunk_invalid_state_challenge() { let result = client.process_block_test(block.into(), Provenance::NONE); assert!(result.is_err()); - let last_message = env.network_adapters[0].pop().unwrap().as_network_requests(); - - if let NetworkRequests::Challenge(network_challenge) = last_message { - assert_eq!(challenge, network_challenge); - } else { - assert!(false); + let mut seen_challenge = false; + while let Some(message) = env.network_adapters[0].pop() { + if let NetworkRequests::Challenge(network_challenge) = message.as_network_requests() { + assert_eq!(network_challenge, challenge); + seen_challenge = true; + break; + } } + assert!(seen_challenge); } /// Receive invalid state transition in chunk as a validator / non-producer. diff --git a/integration-tests/src/tests/client/features.rs b/integration-tests/src/tests/client/features.rs index 81da1a41ec9..4b4449eed9d 100644 --- a/integration-tests/src/tests/client/features.rs +++ b/integration-tests/src/tests/client/features.rs @@ -5,6 +5,7 @@ mod account_id_in_function_call_permission; mod adversarial_behaviors; mod cap_max_gas_price; mod chunk_nodes_cache; +mod chunk_validation; mod delegate_action; #[cfg(feature = "protocol_feature_fix_contract_loading_cost")] mod fix_contract_loading_cost; diff --git a/integration-tests/src/tests/client/features/access_key_nonce_for_implicit_accounts.rs b/integration-tests/src/tests/client/features/access_key_nonce_for_implicit_accounts.rs index 6b0f6daaf17..acb1b3b4aa0 100644 --- a/integration-tests/src/tests/client/features/access_key_nonce_for_implicit_accounts.rs +++ b/integration-tests/src/tests/client/features/access_key_nonce_for_implicit_accounts.rs @@ -784,6 +784,8 @@ impl ChunkForwardingOptimizationTestData { ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(forward), ); } + NetworkRequests::ChunkStateWitness(_, _) => {} + NetworkRequests::ChunkEndorsement(_) => {} _ => { panic!("Unexpected network request: {:?}", requests); } diff --git a/integration-tests/src/tests/client/features/adversarial_behaviors.rs b/integration-tests/src/tests/client/features/adversarial_behaviors.rs index 4468efd71dc..b01dcca33ec 100644 --- a/integration-tests/src/tests/client/features/adversarial_behaviors.rs +++ b/integration-tests/src/tests/client/features/adversarial_behaviors.rs @@ -84,6 +84,12 @@ impl AdversarialBehaviorTestData { NetworkRequests::Challenge(_) => { // challenges not enabled. } + NetworkRequests::ChunkEndorsement(_) => { + // TODO(#10265). + } + NetworkRequests::ChunkStateWitness(_, _) => { + // TODO(#10265). + } _ => { panic!("Unexpected network request: {:?}", requests); } diff --git a/integration-tests/src/tests/client/features/chunk_validation.rs b/integration-tests/src/tests/client/features/chunk_validation.rs new file mode 100644 index 00000000000..152dc8c131b --- /dev/null +++ b/integration-tests/src/tests/client/features/chunk_validation.rs @@ -0,0 +1,143 @@ +use near_chain::{ChainGenesis, Provenance}; +use near_chain_configs::{Genesis, GenesisConfig, GenesisRecords}; +use near_client::test_utils::TestEnv; +use near_o11y::testonly::init_test_logger; +use near_primitives::block::Tip; +use near_primitives::shard_layout::ShardLayout; +use near_primitives::state_record::StateRecord; +use near_primitives::test_utils::create_test_signer; +use near_primitives::types::AccountInfo; +use near_primitives_core::account::Account; +use near_primitives_core::checked_feature; +use near_primitives_core::hash::CryptoHash; +use near_primitives_core::types::AccountId; +use near_primitives_core::version::PROTOCOL_VERSION; +use nearcore::test_utils::TestEnvNightshadeSetupExt; +use std::collections::HashSet; + +const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000; + +#[test] +fn test_chunk_validation_basic() { + init_test_logger(); + + if !checked_feature!("stable", ChunkValidation, PROTOCOL_VERSION) { + println!("Test not applicable without ChunkValidation enabled"); + return; + } + + let validator_stake = 1000000 * ONE_NEAR; + let accounts = + (0..9).map(|i| format!("account{}", i).parse().unwrap()).collect::>(); + let mut genesis_config = GenesisConfig { + // Use the latest protocol version. Otherwise, the version may be too + // old that e.g. blocks don't even store previous heights. + protocol_version: PROTOCOL_VERSION, + // Some arbitrary starting height. Doesn't matter. + genesis_height: 10000, + // We'll use four shards for this test. + shard_layout: ShardLayout::get_simple_nightshade_layout(), + // Make 8 validators, which means 2 will be assigned as chunk validators + // for each chunk. + validators: accounts + .iter() + .take(8) + .map(|account_id| AccountInfo { + account_id: account_id.clone(), + public_key: create_test_signer(account_id.as_str()).public_key(), + amount: validator_stake, + }) + .collect(), + // We don't care about epoch transitions in this test. + epoch_length: 10000, + // The genesis requires this, so set it to something arbitrary. + protocol_treasury_account: accounts[8].clone(), + // Simply make all validators block producers. + num_block_producer_seats: 8, + // Make all validators produce chunks for all shards. + minimum_validators_per_shard: 8, + // Even though not used for the most recent protocol version, + // this must still have the same length as the number of shards, + // or else the genesis fails validation. + num_block_producer_seats_per_shard: vec![8, 8, 8, 8], + ..Default::default() + }; + + // Set up the records corresponding to the validator accounts. + let mut records = Vec::new(); + for (i, account) in accounts.iter().enumerate() { + // The staked amount must be consistent with validators from genesis. + let staked = if i < 8 { validator_stake } else { 0 }; + records.push(StateRecord::Account { + account_id: account.clone(), + account: Account::new(0, staked, CryptoHash::default(), 0), + }); + // The total supply must be correct to pass validation. + genesis_config.total_supply += staked; + } + let genesis = Genesis::new(genesis_config, GenesisRecords(records)).unwrap(); + let chain_genesis = ChainGenesis::new(&genesis); + + let mut env = TestEnv::builder(chain_genesis) + .clients(accounts.iter().take(8).cloned().collect()) + .real_epoch_managers(&genesis.config) + .nightshade_runtimes(&genesis) + .build(); + + for round in 0..10 { + let heads = env + .clients + .iter() + .map(|client| client.chain.head().unwrap().last_block_hash) + .collect::>(); + assert_eq!(heads.len(), 1, "All clients should have the same head"); + let tip = env.clients[0].chain.head().unwrap(); + + let block_producer = get_block_producer(&env, &tip, 1); + println!("Producing block at height {} by {}", tip.height + 1, block_producer); + let block = env.client(&block_producer).produce_block(tip.height + 1).unwrap().unwrap(); + if round > 1 { + for i in 0..4 { + let chunks = block.chunks(); + let chunk = chunks.get(i).unwrap(); + assert_eq!(chunk.height_created(), chunk.height_included()); + } + } + + // Apply the block. + for i in 0..env.clients.len() { + println!( + " Applying block at height {} at {}", + block.header().height(), + env.get_client_id(i) + ); + let blocks_processed = + env.clients[i].process_block_test(block.clone().into(), Provenance::NONE).unwrap(); + assert_eq!(blocks_processed, vec![*block.hash()]); + } + + env.process_partial_encoded_chunks(); + for j in 0..env.clients.len() { + env.process_shards_manager_responses_and_finish_processing_blocks(j); + } + env.propagate_chunk_state_witnesses(); + } + + // Wait a bit and check that we've received at least some chunk approvals. + // TODO(#10265): We need to make this not time-based, and we need to assert + // exactly how many approvals (or total stake) we have. + std::thread::sleep(std::time::Duration::from_secs(1)); + let approvals = env.get_all_chunk_endorsements(); + assert!(!approvals.is_empty()); +} + +// Returns the block producer for the height of head + height_offset. +fn get_block_producer(env: &TestEnv, head: &Tip, height_offset: u64) -> AccountId { + let client = &env.clients[0]; + let epoch_manager = &client.epoch_manager; + let parent_hash = &head.last_block_hash; + let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash).unwrap(); + let height = head.height + height_offset; + let block_producer = epoch_manager.get_block_producer(&epoch_id, height).unwrap(); + block_producer +} diff --git a/tools/chainsync-loadtest/src/network.rs b/tools/chainsync-loadtest/src/network.rs index 0bdfa25502c..8c19df805b1 100644 --- a/tools/chainsync-loadtest/src/network.rs +++ b/tools/chainsync-loadtest/src/network.rs @@ -13,6 +13,7 @@ use near_network::types::{ }; use near_primitives::block::{Approval, Block, BlockHeader}; use near_primitives::challenge::Challenge; +use near_primitives::chunk_validation::ChunkStateWitness; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::sharding::ChunkHash; @@ -302,4 +303,6 @@ impl near_network::client::Client for Network { ) -> Result, ReasonForBan> { Ok(accounts.into_iter().map(|a| a.0).collect()) } + + async fn chunk_state_witness(&self, _witness: ChunkStateWitness) {} }