Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stateless_validation] Implement distribution of state witness parts #11115

Merged
merged 3 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 139 additions & 20 deletions chain/client/src/stateless_validation/state_witness_actions.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::collections::HashMap;
use std::sync::Arc;

use itertools::Itertools;
use near_async::messaging::CanSend;
use near_async::time::Clock;
use near_chain::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
use near_primitives::reed_solomon::ReedSolomonWrapper;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::stateless_validation::{
ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness, PartialEncodedStateWitness,
SignedEncodedChunkStateWitness,
};
use near_primitives::types::{AccountId, EpochId};
use near_primitives::validator_signer::ValidatorSigner;

use crate::metrics;
Expand All @@ -25,6 +30,9 @@ pub struct StateWitnessActions {
epoch_manager: Arc<dyn EpochManagerAdapter>,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
state_witness_tracker: ChunkStateWitnessTracker,
/// Reed Solomon encoder for encoding state witness parts.
/// We keep one wrapper for each length of chunk_validators to avoid re-creating the encoder.
rs_map: HashMap<usize, ReedSolomonWrapper>,
}

impl StateWitnessActions {
Expand All @@ -39,48 +47,125 @@ impl StateWitnessActions {
my_signer,
epoch_manager,
state_witness_tracker: ChunkStateWitnessTracker::new(clock),
rs_map: HashMap::new(),
}
}

pub fn handle_distribute_state_witness_request(
&mut self,
msg: DistributeStateWitnessRequest,
) -> Result<(), Error> {
let DistributeStateWitnessRequest { state_witness } = msg;
let DistributeStateWitnessRequest { epoch_id, chunk_header, state_witness } = msg;

let signed_witness = create_signed_witness(&state_witness, self.my_signer.as_ref())?;

let mut chunk_validators = self
let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
&state_witness.epoch_id,
state_witness.chunk_header.shard_id(),
state_witness.chunk_header.height_created(),
&epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?
.ordered_chunk_validators();

tracing::debug!(
target: "stateless_validation",
"Sending chunk state witness for chunk {:?} to chunk validators {:?}",
state_witness.chunk_header.chunk_hash(),
chunk_header.chunk_hash(),
chunk_validators,
);

let witness_bytes = compress_witness(&state_witness)?;

// Record the witness in order to match the incoming acks for measuring round-trip times.
// See process_chunk_state_witness_ack for the handling of the ack messages.
self.state_witness_tracker.record_witness_sent(
&state_witness,
signed_witness.witness_bytes.size_bytes(),
witness_bytes.size_bytes(),
chunk_validators.len(),
);

// TODO(stateless_validation): Replace with call to send_state_witness_parts after full implementation
self.send_state_witness(witness_bytes, chunk_validators);

Ok(())
}

// TODO(stateless_validation): Deprecate once we send state witness in parts.
// This is the original way of sending out state witness where the chunk producer sends the whole witness
// to all chunk validators.
fn send_state_witness(
&self,
witness_bytes: EncodedChunkStateWitness,
mut chunk_validators: Vec<AccountId>,
) {
// Remove ourselves from the list of chunk validators. Network can't send messages to ourselves.
chunk_validators.retain(|validator| validator != self.my_signer.validator_id());

let signed_witness = SignedEncodedChunkStateWitness {
signature: self.my_signer.sign_chunk_state_witness(&witness_bytes),
witness_bytes,
};

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitness(chunk_validators, signed_witness),
));
}

// Break the state witness into parts and send each part to the corresponding chunk validator owner.
// The chunk validator owner will then forward the part to all other chunk validators.
// Each chunk validator would collect the parts and reconstruct the state witness.
#[allow(unused)]
fn send_state_witness_parts(
&mut self,
epoch_id: EpochId,
chunk_header: ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: Vec<AccountId>,
) -> Result<(), Error> {
// Break the state witness into parts using Reed Solomon encoding.
let rs = self.rs_map.entry(chunk_validators.len()).or_insert_with(|| {
let total_parts = chunk_validators.len();
let data_parts = std::cmp::max(total_parts * 2 / 3, 1);
ReedSolomonWrapper::new(data_parts, total_parts - data_parts)
});
let (parts, encoded_length) = rs.encode(witness_bytes);

let validator_witness_tuple = chunk_validators
.iter()
.zip_eq(parts)
.enumerate()
.map(|(part_ord, (chunk_validator, part))| {
// It's fine to unwrap part here as we just constructed the parts above and we expect
// all of them to be present.
let partial_witness = PartialEncodedStateWitness::new(
epoch_id.clone(),
chunk_header.clone(),
part_ord,
part.unwrap().to_vec(),
encoded_length,
self.my_signer.as_ref(),
);
(chunk_validator.clone(), partial_witness)
})
.collect_vec();

// Since we can't send network message to ourselves, we need to send the PartialEncodedStateWitnessForward
// message for our part.
if let Some((_, partial_witness)) = validator_witness_tuple
.iter()
.find(|(validator, _)| validator == self.my_signer.validator_id())
{
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
chunk_validators,
partial_witness.clone(),
),
));
}

// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple),
));
Ok(())
}

Expand All @@ -98,37 +183,71 @@ impl StateWitnessActions {
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
unimplemented!("{:?}", partial_witness)
// Validate the partial encoded state witness.
self.validate_partial_encoded_state_witness(&partial_witness)?;

// Store the partial encoded state witness for self.
self.store_partial_encoded_state_witness(&partial_witness)?;

// Forward the part to all the chunk validators.
let chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
partial_witness.epoch_id(),
partial_witness.chunk_header().shard_id(),
partial_witness.chunk_header().height_created(),
)?
.ordered_chunk_validators();

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(chunk_validators, partial_witness),
));

Ok(())
}

/// Function to handle receiving partial_encoded_state_witness_forward message from chunk producer.
pub fn handle_partial_encoded_state_witness_forward(
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
// Validate the partial encoded state witness.
self.validate_partial_encoded_state_witness(&partial_witness)?;

// Store the partial encoded state witness for self.
self.store_partial_encoded_state_witness(&partial_witness)?;

Ok(())
}

fn validate_partial_encoded_state_witness(
&self,
partial_witness: &PartialEncodedStateWitness,
) -> Result<(), Error> {
unimplemented!("{:?}", partial_witness)
}

fn store_partial_encoded_state_witness(
&self,
partial_witness: &PartialEncodedStateWitness,
) -> Result<(), Error> {
unimplemented!("{:?}", partial_witness)
}
}

fn create_signed_witness(
witness: &ChunkStateWitness,
my_signer: &dyn ValidatorSigner,
) -> Result<SignedEncodedChunkStateWitness, Error> {
fn compress_witness(witness: &ChunkStateWitness) -> Result<EncodedChunkStateWitness, Error> {
let shard_id_label = witness.chunk_header.shard_id().to_string();
let encode_timer = metrics::CHUNK_STATE_WITNESS_ENCODE_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let (witness_bytes, raw_witness_size) = EncodedChunkStateWitness::encode(&witness)?;
encode_timer.observe_duration();
let signed_witness = SignedEncodedChunkStateWitness {
signature: my_signer.sign_chunk_state_witness(&witness_bytes),
witness_bytes,
};

metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(signed_witness.witness_bytes.size_bytes() as f64);
.observe(witness_bytes.size_bytes() as f64);
metrics::CHUNK_STATE_WITNESS_RAW_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(raw_witness_size as f64);
Ok(signed_witness)
Ok(witness_bytes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use near_network::state_witness::{
use near_network::types::PeerManagerAdapter;
use near_o11y::{handler_debug_span, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::types::EpochId;
use near_primitives::validator_signer::ValidatorSigner;

use super::state_witness_actions::StateWitnessActions;
Expand Down Expand Up @@ -43,6 +45,8 @@ impl actix::Actor for StateWitnessActor {
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct DistributeStateWitnessRequest {
pub epoch_id: EpochId,
pub chunk_header: ShardChunkHeader,
pub state_witness: ChunkStateWitness,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ impl Client {
);
}

self.state_witness_adapter.send(DistributeStateWitnessRequest { state_witness });
self.state_witness_adapter.send(DistributeStateWitnessRequest {
epoch_id: epoch_id.clone(),
chunk_header,
state_witness,
});
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions chain/network/src/peer/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl PeerConfig {
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Event {
ShardsManager(ShardsManagerRequestFromNetwork),
Client(ClientSenderForNetworkInput),
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl actix::Handler<WithNetworkState> for PeerManagerActor {
}

#[derive(Debug, PartialEq, Eq, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
ShardsManager(ShardsManagerRequestFromNetwork),
Client(ClientSenderForNetworkInput),
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ pub enum NetworkRequests {
TxStatus(AccountId, AccountId, CryptoHash),
/// A challenge to invalidate a block.
Challenge(Challenge),
/// TODO(stateless_validation): Deprecate once we send state witness in parts.
/// A chunk's state witness.
ChunkStateWitness(Vec<AccountId>, SignedEncodedChunkStateWitness),
/// Acknowledgement to a chunk's state witness, sent back to the originating chunk producer.
Expand Down
Loading
Loading