Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement simple state witness distribution, chunk validation, and chunk approval distribution skeletons. #10287

Merged
merged 7 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions chain/chain-primitives/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ pub enum Error {
/// Invalid chunk state.
#[error("Invalid Chunk State")]
InvalidChunkState(Box<ChunkState>),
#[error("Invalid Chunk State Witness")]
InvalidChunkStateWitness,
/// Invalid chunk mask
#[error("Invalid Chunk Mask")]
InvalidChunkMask,
Expand Down Expand Up @@ -194,6 +196,11 @@ pub enum Error {
/// Someone is not a validator. Usually happens in signature verification
#[error("Not A Validator")]
NotAValidator,
/// Someone is not a chunk validator. Happens if we're asked to validate a chunk we're not
/// supposed to validate, or to verify a chunk approval signed by a validator that isn't
/// supposed to validate the chunk.
#[error("Not A Chunk Validator")]
NotAChunkValidator,
/// Validator error.
#[error("Validator Error: {0}")]
ValidatorError(String),
Expand Down Expand Up @@ -263,6 +270,7 @@ impl Error {
| Error::InvalidChunk
| Error::InvalidChunkProofs(_)
| Error::InvalidChunkState(_)
| Error::InvalidChunkStateWitness
| Error::InvalidChunkMask
| Error::InvalidStateRoot
| Error::InvalidTxRoot
Expand Down Expand Up @@ -294,6 +302,7 @@ impl Error {
| Error::InvalidBlockMerkleRoot
| Error::InvalidProtocolVersion
| Error::NotAValidator
| Error::NotAChunkValidator
| Error::InvalidChallengeRoot => true,
}
}
Expand Down Expand Up @@ -334,6 +343,7 @@ impl Error {
Error::InvalidChunk => "invalid_chunk",
Error::InvalidChunkProofs(_) => "invalid_chunk_proofs",
Error::InvalidChunkState(_) => "invalid_chunk_state",
Error::InvalidChunkStateWitness => "invalid_chunk_state_witness",
Error::InvalidChunkMask => "invalid_chunk_mask",
Error::InvalidStateRoot => "invalid_state_root",
Error::InvalidTxRoot => "invalid_tx_root",
Expand Down Expand Up @@ -365,6 +375,7 @@ impl Error {
Error::InvalidBlockMerkleRoot => "invalid_block_merkele_root",
Error::InvalidProtocolVersion => "invalid_protocol_version",
Error::NotAValidator => "not_a_validator",
Error::NotAChunkValidator => "not_a_chunk_validator",
Error::InvalidChallengeRoot => "invalid_challenge_root",
}
}
Expand Down
10 changes: 10 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use near_primitives::types::{
AccountId, ApprovalStake, Balance, BlockHeight, EpochHeight, EpochId, Gas, Nonce, NumShards,
ShardId, StateChangesForSplitStates, StateRoot, StateRootNode, ValidatorInfoIdentifier,
};
use near_primitives::validator_mandates::AssignmentWeight;
use near_primitives::version::{ProtocolVersion, PROTOCOL_VERSION};
use near_primitives::views::{
AccessKeyInfoView, AccessKeyList, CallResult, ContractCodeView, EpochValidatorInfo,
Expand Down Expand Up @@ -704,6 +705,15 @@ impl EpochManagerAdapter for MockEpochManager {
Ok(chunk_producers[index].account_id().clone())
}

fn get_chunk_validators(
&self,
_epoch_id: &EpochId,
_shard_id: ShardId,
_height: BlockHeight,
) -> Result<HashMap<AccountId, AssignmentWeight>, EpochError> {
Ok(HashMap::new())
}

fn get_validator_by_account_id(
&self,
epoch_id: &EpochId,
Expand Down
1 change: 1 addition & 0 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ num-rational.workspace = true
once_cell.workspace = true
percent-encoding.workspace = true
rand.workspace = true
rayon.workspace = true
reed-solomon-erasure.workspace = true
regex.workspace = true
reqwest.workspace = true
Expand Down
12 changes: 12 additions & 0 deletions chain/client/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use near_network::types::{
use near_o11y::WithSpanContextExt;
use near_primitives::block::{Approval, Block, BlockHeader};
use near_primitives::challenge::Challenge;
use near_primitives::chunk_validation::ChunkStateWitness;
use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
Expand Down Expand Up @@ -135,6 +136,10 @@ pub enum ProcessTxResponse {
DoesNotTrackShard,
}

#[derive(actix::Message, Debug, PartialEq, Eq)]
#[rtype(result = "()")]
pub struct ChunkStateWitnessMessage(pub ChunkStateWitness);

pub struct Adapter {
/// Address of the client actor.
client_addr: actix::Addr<ClientActor>,
Expand Down Expand Up @@ -333,4 +338,11 @@ impl near_network::client::Client for Adapter {
}
}
}

async fn chunk_state_witness(&self, witness: ChunkStateWitness) {
match self.client_addr.send(ChunkStateWitnessMessage(witness).with_span_context()).await {
Ok(()) => {}
Err(err) => tracing::error!("mailbox error: {err}"),
}
}
}
154 changes: 154 additions & 0 deletions chain/client/src/chunk_validation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::sync::Arc;

use near_async::messaging::{CanSend, Sender};
use near_chain::types::RuntimeAdapter;
use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{NetworkRequests, PeerManagerMessageRequest};
use near_primitives::checked_feature;
use near_primitives::chunk_validation::{
ChunkEndorsement, ChunkEndorsementInner, ChunkEndorsementMessage, ChunkStateWitness,
};
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::types::EpochId;
use near_primitives::validator_signer::ValidatorSigner;

use crate::Client;

/// A module that handles chunk validation logic. Chunk validation refers to a
/// critical process of stateless validation, where chunk validators (certain
/// validators selected to validate the chunk) verify that the chunk's state
/// witness is correct, and then send chunk endorsements to the block producer
/// so that the chunk can be included in the block.
pub struct ChunkValidator {
/// The signer for our own node, if we are a validator. If not, this is None.
my_signer: Option<Arc<dyn ValidatorSigner>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does 'my_signer' mean? :o

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm a validator, this is my signer. It's the signer of the node itself. I'll make a comment.

epoch_manager: Arc<dyn EpochManagerAdapter>,
network_sender: Sender<PeerManagerMessageRequest>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
}

impl ChunkValidator {
pub fn new(
my_signer: Option<Arc<dyn ValidatorSigner>>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
network_sender: Sender<PeerManagerMessageRequest>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) -> Self {
Self { my_signer, epoch_manager, network_sender, runtime_adapter }
}

/// Performs the chunk validation logic. When done, it will send the chunk
/// endorsement message to the block producer. The actual validation logic
/// happens in a separate thread.
pub fn start_validating_chunk(&self, state_witness: ChunkStateWitness) -> Result<(), Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, how is the word 'start' used in rust? Does it indicate async nature of the function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Rust functions commonly spawn an asynchronous task to do things. Those that do are called "spawn(...)" but that specifically spawns a generic function or task into some thread. So I'm following the naming used in Chain::start_processing_block etc.

let Some(my_signer) = self.my_signer.as_ref() else {
return Err(Error::NotAValidator);
};
let chunk_header = state_witness.chunk_header.clone();
let epoch_id =
self.epoch_manager.get_epoch_id_from_prev_block(chunk_header.prev_block_hash())?;
// We will only validate something if we are a chunk validator for this chunk.
// Note this also covers the case before the protocol upgrade for chunk validators,
// because the chunk validators will be empty.
let chunk_validators = self.epoch_manager.get_chunk_validators(
&epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?;
if !chunk_validators.contains_key(my_signer.validator_id()) {
return Err(Error::NotAChunkValidator);
}
let block_producer =
self.epoch_manager.get_block_producer(&epoch_id, chunk_header.height_created())?;

let network_sender = self.network_sender.clone();
let signer = self.my_signer.clone().unwrap();
let runtime_adapter = self.runtime_adapter.clone();
rayon::spawn(move || match validate_chunk(&state_witness, runtime_adapter.as_ref()) {
Ok(()) => {
tracing::debug!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Having chunk hash and block producer not as a part of the message should make logs easier to work with. Also that seems to be a bit more common pattern in our code base. So maybe consider moving those outside of the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean moving them as fields?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, having separate fields instead of string interpolation into the message

target: "chunk_validation",
chunk_hash=?chunk_header.chunk_hash(),
block_producer=%block_producer,
"Chunk validated successfully, sending endorsement",
);
let endorsement_to_sign = ChunkEndorsementInner::new(chunk_header.chunk_hash());
network_sender.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkEndorsement(ChunkEndorsementMessage {
endorsement: ChunkEndorsement {
account_id: signer.validator_id().clone(),
signature: signer.sign_chunk_endorsement(&endorsement_to_sign),
inner: endorsement_to_sign,
},
target: block_producer,
}),
));
}
Err(err) => {
tracing::error!("Failed to validate chunk: {:?}", err);
}
});
Ok(())
}
}

/// The actual chunk validation logic.
fn validate_chunk(
state_witness: &ChunkStateWitness,
runtime_adapter: &dyn RuntimeAdapter,
) -> Result<(), Error> {
// TODO: Replace this with actual stateless validation logic.
if state_witness.state_root != state_witness.chunk_header.prev_state_root() {
return Err(Error::InvalidChunkStateWitness);
}
// We'll need the runtime no matter what so just leaving it here to avoid an
// unused variable warning.
runtime_adapter.get_tries();
Ok(())
}

impl Client {
/// Responds to a network request to verify a `ChunkStateWitness`, which is
/// sent by chunk producers after they produce a chunk.
pub fn process_chunk_state_witness(&mut self, witness: ChunkStateWitness) -> Result<(), Error> {
// TODO(#10265): We'll need to fetch some data from the chain; at the very least we need
// the previous block to exist, and we need the previous chunks' receipt roots.
// Some of this depends on delayed chunk execution. Also, if the previous block
// does not exist, we should queue this (similar to orphans) to retry later.
// For now though, we just pass it to the chunk validation logic.
self.chunk_validator.start_validating_chunk(witness)
}

/// Distributes the chunk state witness to chunk validators that are
/// selected to validate this chunk.
pub fn send_chunk_state_witness_to_chunk_validators(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is little confusing. Is this client used by both chunk proposer AND chunk validator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Client is used by everyone so this is just an additional function on the Client. I didn't want to put it in client.rs, so that I don't further increase the size of that file.

&mut self,
epoch_id: &EpochId,
chunk_header: &ShardChunkHeader,
) -> Result<(), Error> {
let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?;
if !checked_feature!("stable", ChunkValidation, protocol_version) {
return Ok(());
}
let chunk_validators = self.epoch_manager.get_chunk_validators(
epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?;
let witness = ChunkStateWitness {
chunk_header: chunk_header.clone(),
state_root: chunk_header.prev_state_root(),
};
tracing::debug!(
target: "chunk_validation",
"Sending chunk state witness for chunk {:?} to chunk validators {:?}",
chunk_header.chunk_hash(),
chunk_validators.keys(),
);
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitness(chunk_validators.into_keys().collect(), witness),
));
Ok(())
}
}
19 changes: 18 additions & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! This client works completely synchronously and must be operated by some async actor outside.

use crate::adapter::ProcessTxResponse;
use crate::chunk_validation::ChunkValidator;
use crate::debug::BlockProductionTracker;
use crate::debug::PRODUCTION_TIMES_CACHE_SIZE;
use crate::sync::adapter::SyncShardInfo;
Expand All @@ -15,6 +16,7 @@ use crate::{metrics, SyncStatus};
use actix_rt::ArbiterHandle;
use itertools::Itertools;
use lru::LruCache;
use near_async::messaging::IntoSender;
use near_async::messaging::{CanSend, Sender};
use near_chain::chain::VerifyBlockHashAndSignatureResult;
use near_chain::chain::{
Expand Down Expand Up @@ -139,7 +141,7 @@ pub struct Client {
>,
pub do_not_include_chunks_from: LruCache<(EpochId, AccountId), ()>,
/// Network adapter.
network_adapter: PeerManagerAdapter,
pub network_adapter: PeerManagerAdapter,
/// Signer for block producer (if present).
pub validator_signer: Option<Arc<dyn ValidatorSigner>>,
/// Approvals for which we do not have the block yet
Expand Down Expand Up @@ -178,6 +180,8 @@ pub struct Client {
tier1_accounts_cache: Option<(EpochId, Arc<AccountKeys>)>,
/// Used when it is needed to create flat storage in background for some shards.
flat_storage_creator: Option<FlatStorageCreator>,

pub chunk_validator: ChunkValidator,
}

impl Client {
Expand Down Expand Up @@ -319,6 +323,12 @@ impl Client {
validator_signer.clone(),
doomslug_threshold_mode,
);
let chunk_validator = ChunkValidator::new(
validator_signer.clone(),
epoch_manager.clone(),
network_adapter.clone().into_sender(),
runtime_adapter.clone(),
);
Ok(Self {
#[cfg(feature = "test_features")]
adv_produce_blocks: None,
Expand Down Expand Up @@ -360,6 +370,7 @@ impl Client {
chunk_production_info: lru::LruCache::new(PRODUCTION_TIMES_CACHE_SIZE),
tier1_accounts_cache: None,
flat_storage_creator,
chunk_validator,
})
}

Expand Down Expand Up @@ -1721,6 +1732,12 @@ impl Client {
let last_header = Chain::get_prev_chunk_header(epoch_manager, block, shard_id).unwrap();
match self.produce_chunk(*block.hash(), &epoch_id, last_header, next_height, shard_id) {
Ok(Some((encoded_chunk, merkle_paths, receipts))) => {
if let Err(err) = self.send_chunk_state_witness_to_chunk_validators(
&epoch_id,
&encoded_chunk.cloned_header(),
) {
tracing::error!(target: "client", ?err, "Failed to send chunk state witness to chunk validators");
}
self.persist_and_distribute_encoded_chunk(
encoded_chunk,
merkle_paths,
Expand Down
20 changes: 18 additions & 2 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
//! https://github.com/near/nearcore/issues/7899

use crate::adapter::{
BlockApproval, BlockHeadersResponse, BlockResponse, ProcessTxRequest, ProcessTxResponse,
RecvChallenge, SetNetworkInfo, StateResponse,
BlockApproval, BlockHeadersResponse, BlockResponse, ChunkStateWitnessMessage, ProcessTxRequest,
ProcessTxResponse, RecvChallenge, SetNetworkInfo, StateResponse,
};
#[cfg(feature = "test_features")]
use crate::client::AdvProduceBlocksMode;
Expand Down Expand Up @@ -1935,6 +1935,22 @@ impl Handler<WithSpanContext<SyncMessage>> for ClientActor {
}
}

impl Handler<WithSpanContext<ChunkStateWitnessMessage>> for ClientActor {
type Result = ();

#[perf]
fn handle(
&mut self,
msg: WithSpanContext<ChunkStateWitnessMessage>,
_: &mut Context<Self>,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "client", msg);
if let Err(err) = self.client.process_chunk_state_witness(msg.0) {
tracing::error!(target: "client", ?err, "Error processing chunk state witness");
}
}
}

/// Returns random seed sampled from the current thread
pub fn random_seed_from_thread() -> RngSeed {
let mut rng_seed: RngSeed = [0; 32];
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use near_client_primitives::debug::DebugStatus;

pub mod adapter;
pub mod adversarial;
mod chunk_validation;
mod client;
mod client_actor;
mod config_updater;
Expand Down
Loading
Loading