Skip to content

Commit

Permalink
Merge of #6224
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Aug 12, 2024
2 parents f2fdbe7 + 815599c commit 81ee77d
Show file tree
Hide file tree
Showing 15 changed files with 624 additions and 34 deletions.
28 changes: 27 additions & 1 deletion beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ pub struct BeaconProcessorQueueLengths {
bbroots_queue: usize,
blbroots_queue: usize,
blbrange_queue: usize,
dcbroots_queue: usize,
dcbrange_queue: usize,
gossip_bls_to_execution_change_queue: usize,
lc_bootstrap_queue: usize,
lc_optimistic_update_queue: usize,
Expand Down Expand Up @@ -172,6 +174,9 @@ impl BeaconProcessorQueueLengths {
bbroots_queue: 1024,
blbroots_queue: 1024,
blbrange_queue: 1024,
// TODO(das): pick proper values
dcbroots_queue: 1024,
dcbrange_queue: 1024,
gossip_bls_to_execution_change_queue: 16384,
lc_bootstrap_queue: 1024,
lc_optimistic_update_queue: 512,
Expand Down Expand Up @@ -230,6 +235,8 @@ pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
pub const DATA_COLUMNS_BY_ROOTS_REQUEST: &str = "data_columns_by_roots_request";
pub const DATA_COLUMNS_BY_RANGE_REQUEST: &str = "data_columns_by_range_request";
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
pub const LIGHT_CLIENT_FINALITY_UPDATE_REQUEST: &str = "light_client_finality_update_request";
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST: &str = "light_client_optimistic_update_request";
Expand Down Expand Up @@ -609,6 +616,8 @@ pub enum Work<E: EthSpec> {
BlocksByRootsRequest(AsyncFn),
BlobsByRangeRequest(BlockingFn),
BlobsByRootsRequest(BlockingFn),
DataColumnsByRootsRequest(BlockingFn),
DataColumnsByRangeRequest(BlockingFn),
GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
LightClientOptimisticUpdateRequest(BlockingFn),
Expand Down Expand Up @@ -652,6 +661,8 @@ impl<E: EthSpec> Work<E> {
Work::BlocksByRootsRequest(_) => BLOCKS_BY_ROOTS_REQUEST,
Work::BlobsByRangeRequest(_) => BLOBS_BY_RANGE_REQUEST,
Work::BlobsByRootsRequest(_) => BLOBS_BY_ROOTS_REQUEST,
Work::DataColumnsByRootsRequest(_) => DATA_COLUMNS_BY_ROOTS_REQUEST,
Work::DataColumnsByRangeRequest(_) => DATA_COLUMNS_BY_RANGE_REQUEST,
Work::LightClientBootstrapRequest(_) => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
Work::LightClientOptimisticUpdateRequest(_) => LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST,
Work::LightClientFinalityUpdateRequest(_) => LIGHT_CLIENT_FINALITY_UPDATE_REQUEST,
Expand Down Expand Up @@ -816,6 +827,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue);
let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue);
let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue);
let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue);
let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue);

let mut gossip_bls_to_execution_change_queue =
FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);
Expand Down Expand Up @@ -1118,6 +1131,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = blbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = dcbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = dcbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check slashings after all other consensus messages so we prioritize
// following head.
//
Expand Down Expand Up @@ -1282,6 +1299,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlobsByRootsRequest { .. } => {
blbroots_queue.push(work, work_id, &self.log)
}
Work::DataColumnsByRootsRequest { .. } => {
dcbroots_queue.push(work, work_id, &self.log)
}
Work::DataColumnsByRangeRequest { .. } => {
dcbrange_queue.push(work, work_id, &self.log)
}
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1483,7 +1506,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move {
work.await;
}),
Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) => {
Work::BlobsByRangeRequest(process_fn)
| Work::BlobsByRootsRequest(process_fn)
| Work::DataColumnsByRootsRequest(process_fn)
| Work::DataColumnsByRangeRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
}
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::LightClientOptimisticUpdate => return,
Protocol::LightClientFinalityUpdate => return,
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRange => PeerAction::MidToleranceError,
Protocol::Goodbye => PeerAction::LowToleranceError,
Protocol::MetaData => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
Expand All @@ -587,6 +589,8 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::BlocksByRoot => return,
Protocol::BlobsByRange => return,
Protocol::BlobsByRoot => return,
Protocol::DataColumnsByRoot => return,
Protocol::DataColumnsByRange => return,
Protocol::Goodbye => return,
Protocol::LightClientBootstrap => return,
Protocol::LightClientOptimisticUpdate => return,
Expand All @@ -607,6 +611,8 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRange => PeerAction::MidToleranceError,
Protocol::LightClientBootstrap => return,
Protocol::LightClientOptimisticUpdate => return,
Protocol::LightClientFinalityUpdate => return,
Expand Down
140 changes: 134 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::{
BlobSidecar, ChainSpec, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, RuntimeVariableList, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra,
BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, ForkContext, ForkName, Hash256,
LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate,
RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase,
SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, SignedBeaconBlockDeneb,
SignedBeaconBlockElectra,
};
use unsigned_varint::codec::Uvi;

Expand Down Expand Up @@ -70,6 +71,8 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(),
RPCResponse::DataColumnsByRoot(res) => res.as_ssz_bytes(),
RPCResponse::DataColumnsByRange(res) => res.as_ssz_bytes(),
RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(),
RPCResponse::LightClientOptimisticUpdate(res) => res.as_ssz_bytes(),
RPCResponse::LightClientFinalityUpdate(res) => res.as_ssz_bytes(),
Expand Down Expand Up @@ -224,6 +227,8 @@ impl<E: EthSpec> Encoder<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
},
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(),
OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
Expand Down Expand Up @@ -414,7 +419,12 @@ fn context_bytes<E: EthSpec>(
}
};
}
RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) => {
RPCResponse::BlobsByRange(_)
| RPCResponse::BlobsByRoot(_)
| RPCResponse::DataColumnsByRoot(_)
| RPCResponse::DataColumnsByRange(_) => {
// TODO(das): If DataColumnSidecar is defined as an Electra type, update the
// context bytes to point to ForkName::Electra
return fork_context.to_context_bytes(ForkName::Deneb);
}
RPCResponse::LightClientBootstrap(lc_bootstrap) => {
Expand Down Expand Up @@ -512,6 +522,17 @@ fn handle_rpc_request<E: EthSpec>(
)?,
})))
}
SupportedProtocol::DataColumnsByRootV1 => Ok(Some(InboundRequest::DataColumnsByRoot(
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_data_column_sidecars as usize,
)?,
},
))),
SupportedProtocol::DataColumnsByRangeV1 => Ok(Some(InboundRequest::DataColumnsByRange(
DataColumnsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
Expand Down Expand Up @@ -604,6 +625,51 @@ fn handle_rpc_response<E: EthSpec>(
),
)),
},
SupportedProtocol::DataColumnsByRootV1 => match fork_name {
Some(fork_name) => {
// TODO(das): PeerDAS is currently supported for both deneb and electra. This check
// does not advertise the topic on deneb, simply allows it to decode it. Advertise
// logic is in `SupportedTopic::currently_supported`.
if fork_name.deneb_enabled() {
Ok(Some(RPCResponse::DataColumnsByRoot(Arc::new(
DataColumnSidecar::from_ssz_bytes(decoded_buffer)?,
))))
} else {
Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid fork name for data columns by root".to_string(),
))
}
}
None => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
format!(
"No context bytes provided for {:?} response",
versioned_protocol
),
)),
},
SupportedProtocol::DataColumnsByRangeV1 => match fork_name {
Some(fork_name) => {
if fork_name.deneb_enabled() {
Ok(Some(RPCResponse::DataColumnsByRange(Arc::new(
DataColumnSidecar::from_ssz_bytes(decoded_buffer)?,
))))
} else {
Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid fork name for data columns by range".to_string(),
))
}
}
None => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
format!(
"No context bytes provided for {:?} response",
versioned_protocol
),
)),
},
SupportedProtocol::PingV1 => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
Expand Down Expand Up @@ -747,7 +813,8 @@ mod tests {
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use types::{
blob_sidecar::BlobIdentifier, BeaconBlock, BeaconBlockAltair, BeaconBlockBase,
BeaconBlockBellatrix, EmptyBlock, Epoch, FullPayload, Signature, Slot,
BeaconBlockBellatrix, DataColumnIdentifier, EmptyBlock, Epoch, FullPayload, Signature,
Slot,
};

type Spec = types::MainnetEthSpec;
Expand Down Expand Up @@ -794,6 +861,10 @@ mod tests {
Arc::new(BlobSidecar::empty())
}

fn empty_data_column_sidecar() -> Arc<DataColumnSidecar<Spec>> {
Arc::new(DataColumnSidecar::empty())
}

/// Bellatrix block with length < max_rpc_size.
fn bellatrix_block_small(
fork_context: &ForkContext,
Expand Down Expand Up @@ -855,6 +926,27 @@ mod tests {
}
}

fn dcbrange_request() -> DataColumnsByRangeRequest {
DataColumnsByRangeRequest {
start_slot: 0,
count: 10,
columns: vec![1, 2, 3],
}
}

fn dcbroot_request(spec: &ChainSpec) -> DataColumnsByRootRequest {
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::new(
vec![DataColumnIdentifier {
block_root: Hash256::zero(),
index: 0,
}],
spec.max_request_data_column_sidecars as usize,
)
.unwrap(),
}
}

fn bbroot_request_v1(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()], spec)
}
Expand Down Expand Up @@ -1012,6 +1104,12 @@ mod tests {
OutboundRequest::BlobsByRoot(bbroot) => {
assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot))
}
OutboundRequest::DataColumnsByRoot(dcbroot) => {
assert_eq!(decoded, InboundRequest::DataColumnsByRoot(dcbroot))
}
OutboundRequest::DataColumnsByRange(dcbrange) => {
assert_eq!(decoded, InboundRequest::DataColumnsByRange(dcbrange))
}
OutboundRequest::Ping(ping) => {
assert_eq!(decoded, InboundRequest::Ping(ping))
}
Expand Down Expand Up @@ -1138,6 +1236,34 @@ mod tests {
),
Ok(Some(RPCResponse::BlobsByRoot(empty_blob_sidecar()))),
);

assert_eq!(
encode_then_decode_response(
SupportedProtocol::DataColumnsByRangeV1,
RPCCodedResponse::Success(RPCResponse::DataColumnsByRange(
empty_data_column_sidecar()
)),
ForkName::Deneb,
&chain_spec
),
Ok(Some(RPCResponse::DataColumnsByRange(
empty_data_column_sidecar()
))),
);

assert_eq!(
encode_then_decode_response(
SupportedProtocol::DataColumnsByRootV1,
RPCCodedResponse::Success(RPCResponse::DataColumnsByRoot(
empty_data_column_sidecar()
)),
ForkName::Deneb,
&chain_spec
),
Ok(Some(RPCResponse::DataColumnsByRoot(
empty_data_column_sidecar()
))),
);
}

// Test RPCResponse encoding/decoding for V1 messages
Expand Down Expand Up @@ -1491,6 +1617,8 @@ mod tests {
OutboundRequest::MetaData(MetadataRequest::new_v1()),
OutboundRequest::BlobsByRange(blbrange_request()),
OutboundRequest::BlobsByRoot(blbroot_request(&chain_spec)),
OutboundRequest::DataColumnsByRange(dcbrange_request()),
OutboundRequest::DataColumnsByRoot(dcbroot_request(&chain_spec)),
OutboundRequest::MetaData(MetadataRequest::new_v2()),
];

Expand Down
Loading

0 comments on commit 81ee77d

Please sign in to comment.