From 61e55cb0263b7a843024e9a2f2d11c89e3077a8f Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 31 Mar 2022 00:47:34 +0530 Subject: [PATCH 01/10] Fix limits for blocks by range --- .../lighthouse_network/src/rpc/protocol.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 1e65041991d..e235d6edc01 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -63,7 +63,10 @@ lazy_static! { /// The `BeaconBlockMerge` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing. /// We calculate the value from its fields instead of constructing the block and checking the length. - pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = types::ExecutionPayload::::max_execution_payload_size(); + pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = + *SIGNED_BEACON_BLOCK_MERGE_MIN + + types::ExecutionPayload::::max_execution_payload_size() + - types::ExecutionPayload::::empty().as_ssz_bytes().len(); pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize = VariableList::::from(Vec::::new()) @@ -294,12 +297,18 @@ impl ProtocolId { ), Protocol::BlocksByRoot => RpcLimits::new( std::cmp::min( - *SIGNED_BEACON_BLOCK_ALTAIR_MIN, - *SIGNED_BEACON_BLOCK_BASE_MIN, + std::cmp::min( + *SIGNED_BEACON_BLOCK_ALTAIR_MIN, + *SIGNED_BEACON_BLOCK_BASE_MIN, + ), + *SIGNED_BEACON_BLOCK_MERGE_MIN, ), std::cmp::max( - *SIGNED_BEACON_BLOCK_ALTAIR_MAX, - *SIGNED_BEACON_BLOCK_BASE_MAX, + std::cmp::max( + *SIGNED_BEACON_BLOCK_ALTAIR_MAX, + *SIGNED_BEACON_BLOCK_BASE_MAX, + ), + *SIGNED_BEACON_BLOCK_MERGE_MAX, ), ), From 02c00c62b44dcc1d90a98779e4f6fa7f3d411597 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 31 Mar 2022 01:04:59 +0530 Subject: [PATCH 02/10] Return limits based on current fork --- .../lighthouse_network/src/rpc/codec/base.rs | 4 +- .../src/rpc/codec/ssz_snappy.rs | 4 +- .../lighthouse_network/src/rpc/protocol.rs | 55 ++++++++----------- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs index eca05787853..02eaeac73e2 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/base.rs @@ -260,9 +260,9 @@ mod tests { ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy); // Response limits - let limit = protocol_id.rpc_response_limits::(); - let mut max = encode_len(limit.max + 1); let fork_context = Arc::new(fork_context()); + let limit = protocol_id.rpc_response_limits::(&fork_context); + let mut max = encode_len(limit.max + 1); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), 1_048_576, diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 0924dca0c08..bf15bafe577 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -279,7 +279,9 @@ impl Decoder for SSZSnappyOutboundCodec { // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of // packet size for ssz container corresponding to `self.protocol`. - let ssz_limits = self.protocol.rpc_response_limits::(); + let ssz_limits = self + .protocol + .rpc_response_limits::(&self.fork_context); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { return Err(RPCError::InvalidData); } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index e235d6edc01..29b86f1af28 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -109,10 +109,9 @@ const REQUEST_TIMEOUT: u64 = 15; /// Returns the maximum bytes that can be sent across the RPC. pub fn max_rpc_size(fork_context: &ForkContext) -> usize { - if fork_context.fork_exists(ForkName::Merge) { - MAX_RPC_SIZE_POST_MERGE - } else { - MAX_RPC_SIZE + match fork_context.current_fork() { + ForkName::Merge => MAX_RPC_SIZE_POST_MERGE, + _ => MAX_RPC_SIZE, } } @@ -272,45 +271,39 @@ impl ProtocolId { } /// Returns min and max size for messages of given protocol id responses. - pub fn rpc_response_limits(&self) -> RpcLimits { + pub fn rpc_response_limits(&self, fork_context: &ForkContext) -> RpcLimits { match self.message_name { Protocol::Status => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), ), Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response - Protocol::BlocksByRange => RpcLimits::new( - std::cmp::min( - std::cmp::min( - *SIGNED_BEACON_BLOCK_ALTAIR_MIN, - *SIGNED_BEACON_BLOCK_BASE_MIN, - ), - *SIGNED_BEACON_BLOCK_MERGE_MIN, + Protocol::BlocksByRange => match fork_context.current_fork() { + ForkName::Base => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_BASE_MIN, *SIGNED_BEACON_BLOCK_BASE_MAX) + } + ForkName::Altair => RpcLimits::new( + *SIGNED_BEACON_BLOCK_ALTAIR_MIN, + *SIGNED_BEACON_BLOCK_ALTAIR_MAX, ), - std::cmp::max( - std::cmp::max( - *SIGNED_BEACON_BLOCK_ALTAIR_MAX, - *SIGNED_BEACON_BLOCK_BASE_MAX, - ), + ForkName::Merge => RpcLimits::new( + *SIGNED_BEACON_BLOCK_MERGE_MIN, *SIGNED_BEACON_BLOCK_MERGE_MAX, ), - ), - Protocol::BlocksByRoot => RpcLimits::new( - std::cmp::min( - std::cmp::min( - *SIGNED_BEACON_BLOCK_ALTAIR_MIN, - *SIGNED_BEACON_BLOCK_BASE_MIN, - ), - *SIGNED_BEACON_BLOCK_MERGE_MIN, + }, + Protocol::BlocksByRoot => match fork_context.current_fork() { + ForkName::Base => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_BASE_MIN, *SIGNED_BEACON_BLOCK_BASE_MAX) + } + ForkName::Altair => RpcLimits::new( + *SIGNED_BEACON_BLOCK_ALTAIR_MIN, + *SIGNED_BEACON_BLOCK_ALTAIR_MAX, ), - std::cmp::max( - std::cmp::max( - *SIGNED_BEACON_BLOCK_ALTAIR_MAX, - *SIGNED_BEACON_BLOCK_BASE_MAX, - ), + ForkName::Merge => RpcLimits::new( + *SIGNED_BEACON_BLOCK_MERGE_MIN, *SIGNED_BEACON_BLOCK_MERGE_MAX, ), - ), + }, Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), From 959c1ae12aec1d20b754744b9345d12834264cec Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 31 Mar 2022 02:06:58 +0530 Subject: [PATCH 03/10] Address review comments --- .../lighthouse_network/src/rpc/protocol.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 29b86f1af28..8898d774125 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -63,10 +63,18 @@ lazy_static! { /// The `BeaconBlockMerge` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing. /// We calculate the value from its fields instead of constructing the block and checking the length. + /// Note: This is only the theoretical upper bound. We further bound the max size we receive over the network + /// with `MAX_RPC_SIZE_POST_MERGE`. pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = - *SIGNED_BEACON_BLOCK_MERGE_MIN - + types::ExecutionPayload::::max_execution_payload_size() - - types::ExecutionPayload::::empty().as_ssz_bytes().len(); + // Size of a full merge block with an empty execution payload + SignedBeaconBlock::::from_block( + BeaconBlock::Merge(BeaconBlockMerge::::full(&MainnetEthSpec::default_spec())), + Signature::empty(), + ) + .as_ssz_bytes() + .len() + - types::ExecutionPayload::::empty().as_ssz_bytes().len() // subtracting size of empty execution payload included above + + types::ExecutionPayload::::max_execution_payload_size(); // adding max size of execution payload (~16gb) pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize = VariableList::::from(Vec::::new()) @@ -111,7 +119,7 @@ const REQUEST_TIMEOUT: u64 = 15; pub fn max_rpc_size(fork_context: &ForkContext) -> usize { match fork_context.current_fork() { ForkName::Merge => MAX_RPC_SIZE_POST_MERGE, - _ => MAX_RPC_SIZE, + ForkName::Altair | ForkName::Base => MAX_RPC_SIZE, } } From 1d065cdb41bf44023e9026eb8834d84fd0c9ccee Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 31 Mar 2022 18:16:07 +0530 Subject: [PATCH 04/10] Fix rpc tests to be fork aware --- .../lighthouse_network/tests/common/mod.rs | 43 +++++++++++++------ .../lighthouse_network/tests/rpc_tests.rs | 32 ++++++++------ 2 files changed, 50 insertions(+), 25 deletions(-) diff --git a/beacon_node/lighthouse_network/tests/common/mod.rs b/beacon_node/lighthouse_network/tests/common/mod.rs index e79fdf464dd..ea770de6c23 100644 --- a/beacon_node/lighthouse_network/tests/common/mod.rs +++ b/beacon_node/lighthouse_network/tests/common/mod.rs @@ -10,7 +10,9 @@ use std::sync::Arc; use std::sync::Weak; use std::time::Duration; use tokio::runtime::Runtime; -use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec}; +use types::{ + ChainSpec, EnrForkId, Epoch, EthSpec, ForkContext, ForkName, Hash256, MinimalEthSpec, Slot, +}; use unused_port::unused_tcp_port; #[allow(clippy::type_complexity)] @@ -26,13 +28,20 @@ type ReqId = usize; use tempfile::Builder as TempBuilder; /// Returns a dummy fork context -pub fn fork_context() -> ForkContext { +pub fn fork_context(fork_name: ForkName) -> ForkContext { let mut chain_spec = E::default_spec(); - // Set fork_epoch to `Some` to ensure that the `ForkContext` object - // includes altair in the list of forks - chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); - chain_spec.bellatrix_fork_epoch = Some(types::Epoch::new(84)); - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(E::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(E::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } pub struct Libp2pInstance(LibP2PService, exit_future::Signal); @@ -90,6 +99,7 @@ pub async fn build_libp2p_instance( rt: Weak, boot_nodes: Vec, log: slog::Logger, + fork_name: ForkName, ) -> Libp2pInstance { let port = unused_tcp_port().unwrap(); let config = build_config(port, boot_nodes); @@ -101,7 +111,7 @@ pub async fn build_libp2p_instance( let libp2p_context = lighthouse_network::Context { config: &config, enr_fork_id: EnrForkId::default(), - fork_context: Arc::new(fork_context()), + fork_context: Arc::new(fork_context(fork_name)), chain_spec: &ChainSpec::minimal(), gossipsub_registry: None, }; @@ -125,10 +135,11 @@ pub async fn build_full_mesh( rt: Weak, log: slog::Logger, n: usize, + fork_name: ForkName, ) -> Vec { let mut nodes = Vec::with_capacity(n); for _ in 0..n { - nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await); + nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await); } let multiaddrs: Vec = nodes .iter() @@ -154,12 +165,13 @@ pub async fn build_full_mesh( pub async fn build_node_pair( rt: Weak, log: &slog::Logger, + fork_name: ForkName, ) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); - let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await; - let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await; + let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name).await; + let mut receiver = build_libp2p_instance(rt, vec![], receiver_log, fork_name).await; let receiver_multiaddr = receiver.swarm.behaviour_mut().local_enr().multiaddr()[1].clone(); @@ -198,10 +210,15 @@ pub async fn build_node_pair( // Returns `n` peers in a linear topology #[allow(dead_code)] -pub async fn build_linear(rt: Weak, log: slog::Logger, n: usize) -> Vec { +pub async fn build_linear( + rt: Weak, + log: slog::Logger, + n: usize, + fork_name: ForkName, +) -> Vec { let mut nodes = Vec::with_capacity(n); for _ in 0..n { - nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await); + nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await); } let multiaddrs: Vec = nodes diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index a270e404478..f9db2962606 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -12,7 +12,7 @@ use tokio::runtime::Runtime; use tokio::time::sleep; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, EthSpec, ForkContext, - Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, + ForkName, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, }; mod common; @@ -61,7 +61,8 @@ fn test_status_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // Dummy STATUS RPC message let rpc_request = Request::Status(StatusMessage { @@ -159,7 +160,8 @@ fn test_blocks_by_range_chunked_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -179,7 +181,7 @@ fn test_blocks_by_range_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block))); - let full_block = merge_block_small(&common::fork_context()); + let full_block = merge_block_small(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRange(Some(Box::new(signed_full_block))); @@ -298,7 +300,8 @@ fn test_blocks_by_range_over_limit() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -308,7 +311,7 @@ fn test_blocks_by_range_over_limit() { }); // BlocksByRange Response - let full_block = merge_block_large(&common::fork_context()); + let full_block = merge_block_large(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block))); @@ -395,7 +398,8 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -526,7 +530,8 @@ fn test_blocks_by_range_single_empty_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -641,7 +646,8 @@ fn test_blocks_by_root_chunked_rpc() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRoot Request let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { @@ -664,7 +670,7 @@ fn test_blocks_by_root_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRoot(Some(Box::new(signed_full_block))); - let full_block = merge_block_small(&common::fork_context()); + let full_block = merge_block_small(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRoot(Some(Box::new(signed_full_block))); @@ -779,7 +785,8 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRoot Request let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { @@ -916,7 +923,8 @@ fn test_goodbye_rpc() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // build the sender future let sender_future = async { From f3227de0dd47f0f5d763f1b344891bd6fd80c921 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 31 Mar 2022 19:44:18 +0530 Subject: [PATCH 05/10] Update rpc encode/decode tests --- .../lighthouse_network/src/rpc/codec/base.rs | 48 +++- .../src/rpc/codec/ssz_snappy.rs | 239 +++++++++++++++--- 2 files changed, 238 insertions(+), 49 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs index 02eaeac73e2..c17af4289c3 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/base.rs @@ -184,13 +184,25 @@ mod tests { use crate::rpc::protocol::*; use std::sync::Arc; - use types::{ForkContext, Hash256}; + use types::{Epoch, ForkContext, ForkName, Hash256, Slot}; use unsigned_varint::codec::Uvi; type Spec = types::MainnetEthSpec; - fn fork_context() -> ForkContext { - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &Spec::default_spec()) + fn fork_context(fork_name: ForkName) -> ForkContext { + let mut chain_spec = Spec::default_spec(); + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } #[test] @@ -202,9 +214,12 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); - let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576, fork_context); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context), + fork_context, + ); // remove response code let mut snappy_buf = buf.clone(); @@ -234,9 +249,12 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); - let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576, fork_context); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context), + fork_context, + ); let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err(); @@ -260,12 +278,13 @@ mod tests { ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy); // Response limits - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let max_rpc_size = max_rpc_size(&fork_context); let limit = protocol_id.rpc_response_limits::(&fork_context); let mut max = encode_len(limit.max + 1); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); @@ -273,7 +292,7 @@ mod tests { let mut min = encode_len(limit.min - 1); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); @@ -283,13 +302,14 @@ mod tests { let mut max = encode_len(limit.max + 1); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); let mut min = encode_len(limit.min - 1); - let mut codec = SSZSnappyOutboundCodec::::new(protocol_id, 1_048_576, fork_context); + let mut codec = + SSZSnappyOutboundCodec::::new(protocol_id, max_rpc_size, fork_context); assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); } } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index bf15bafe577..443eba143f9 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -617,8 +617,8 @@ mod tests { }; use std::sync::Arc; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, Epoch, ForkContext, Hash256, Signature, - SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, ForkContext, + Hash256, Signature, SignedBeaconBlock, Slot, }; use snap::write::FrameEncoder; @@ -627,12 +627,20 @@ mod tests { type Spec = types::MainnetEthSpec; - fn fork_context() -> ForkContext { + fn fork_context(fork_name: ForkName) -> ForkContext { let mut chain_spec = Spec::default_spec(); - // Set fork_epoch to `Some` to ensure that the `ForkContext` object - // includes altair in the list of forks - chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } fn base_block() -> SignedBeaconBlock { @@ -646,6 +654,34 @@ mod tests { SignedBeaconBlock::from_block(full_block, Signature::empty()) } + /// Merge block with length < max_rpc_size. + fn merge_block_small(fork_context: &ForkContext) -> SignedBeaconBlock { + let mut block = BeaconBlockMerge::empty(&Spec::default_spec()); + let tx = VariableList::from(vec![0; 1024]); + let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); + + block.body.execution_payload.transactions = txs; + + let block = BeaconBlock::Merge(block); + assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context)); + SignedBeaconBlock::from_block(block, Signature::empty()) + } + + /// Merge block with length > MAX_RPC_SIZE. + /// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory. + /// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. + fn merge_block_large(fork_context: &ForkContext) -> SignedBeaconBlock { + let mut block = BeaconBlockMerge::empty(&Spec::default_spec()); + let tx = VariableList::from(vec![0; 1024]); + let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::>()); + + block.body.execution_payload.transactions = txs; + + let block = BeaconBlock::Merge(block); + assert!(block.ssz_bytes_len() > max_rpc_size(fork_context)); + SignedBeaconBlock::from_block(block, Signature::empty()) + } + fn status_message() -> StatusMessage { StatusMessage { fork_digest: [0; 4], @@ -680,10 +716,11 @@ mod tests { protocol: Protocol, version: Version, message: RPCCodedResponse, + fork_name: ForkName, ) -> Result { - let max_packet_size = 1_048_576; let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(fork_name)); + let max_packet_size = max_rpc_size(&fork_context); let mut buf = BytesMut::new(); let mut snappy_inbound_codec = @@ -698,9 +735,10 @@ mod tests { protocol: Protocol, version: Version, message: &mut BytesMut, + fork_name: ForkName, ) -> Result>, RPCError> { let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(fork_name)); let max_packet_size = max_rpc_size(&fork_context); let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); @@ -713,9 +751,10 @@ mod tests { protocol: Protocol, version: Version, message: RPCCodedResponse, + fork_name: ForkName, ) -> Result>, RPCError> { - let mut encoded = encode(protocol, version.clone(), message)?; - decode(protocol, version, &mut encoded) + let mut encoded = encode(protocol, version.clone(), message, fork_name)?; + decode(protocol, version, &mut encoded, fork_name) } // Test RPCResponse encoding/decoding for V1 messages @@ -725,7 +764,8 @@ mod tests { encode_then_decode( Protocol::Status, Version::V1, - RPCCodedResponse::Success(RPCResponse::Status(status_message())) + RPCCodedResponse::Success(RPCResponse::Status(status_message())), + ForkName::Base, ), Ok(Some(RPCResponse::Status(status_message()))) ); @@ -734,7 +774,8 @@ mod tests { encode_then_decode( Protocol::Ping, Version::V1, - RPCCodedResponse::Success(RPCResponse::Pong(ping_message())) + RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), + ForkName::Base, ), Ok(Some(RPCResponse::Pong(ping_message()))) ); @@ -743,7 +784,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V1, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block())))) ); @@ -754,6 +796,7 @@ mod tests { Protocol::BlocksByRange, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap_err(), RPCError::SSZDecodeError(_) @@ -765,7 +808,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRoot, Version::V1, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))) ); @@ -776,6 +820,7 @@ mod tests { Protocol::BlocksByRoot, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap_err(), RPCError::SSZDecodeError(_) @@ -788,6 +833,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -797,6 +843,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -807,6 +854,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -821,6 +869,7 @@ mod tests { Protocol::Status, Version::V2, RPCCodedResponse::Success(RPCResponse::Status(status_message())), + ForkName::Base, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -834,6 +883,7 @@ mod tests { Protocol::Ping, Version::V2, RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), + ForkName::Base, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -845,7 +895,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block())))) ); @@ -854,35 +905,98 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), + ForkName::Altair, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(altair_block())))) ); + let merge_block_small = merge_block_small(&fork_context(ForkName::Merge)); + let merge_block_large = merge_block_large(&fork_context(ForkName::Merge)); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRange, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new( + merge_block_small.clone() + ))), + ForkName::Merge, + ), + Ok(Some(RPCResponse::BlocksByRange(Box::new( + merge_block_small.clone() + )))) + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRange, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new( + merge_block_large.clone() + ))), + ForkName::Merge, + ) + .unwrap_err(), + RPCError::InvalidData, + "Decoding a block larger than max_rpc_size should fail" + ); + assert_eq!( encode_then_decode( Protocol::BlocksByRoot, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, ), - Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))) + Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))), ); assert_eq!( encode_then_decode( Protocol::BlocksByRoot, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ), Ok(Some(RPCResponse::BlocksByRoot(Box::new(altair_block())))) ); + assert_eq!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new( + merge_block_small.clone() + ))), + ForkName::Merge, + ), + Ok(Some(RPCResponse::BlocksByRoot(Box::new( + merge_block_small.clone() + )))) + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new( + merge_block_large.clone() + ))), + ForkName::Merge, + ) + .unwrap_err(), + RPCError::InvalidData, + "Decoding a block larger than max_rpc_size should fail" + ); + // A MetaDataV1 still encodes as a MetaDataV2 since version is Version::V2 assert_eq!( encode_then_decode( Protocol::MetaData, Version::V2, - RPCCodedResponse::Success(RPCResponse::MetaData(metadata())) + RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata_v2()))) ); @@ -891,7 +1005,8 @@ mod tests { encode_then_decode( Protocol::MetaData, Version::V2, - RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())) + RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), + ForkName::Altair, ), Ok(Some(RPCResponse::MetaData(metadata_v2()))) ); @@ -900,20 +1015,27 @@ mod tests { // Test RPCResponse encoding/decoding for V2 messages #[test] fn test_context_bytes_v2() { - let fork_context = fork_context(); + let fork_context = fork_context(ForkName::Altair); // Removing context bytes for v2 messages should error let mut encoded_bytes = encode( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ) .unwrap(); let _ = encoded_bytes.split_to(4); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded_bytes, + ForkName::Base + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -921,13 +1043,20 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, ) .unwrap(); let _ = encoded_bytes.split_to(4); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded_bytes, + ForkName::Base + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -936,6 +1065,7 @@ mod tests { Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); @@ -945,7 +1075,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::SSZDecodeError(_), )); @@ -954,6 +1090,7 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap(); @@ -962,7 +1099,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::SSZDecodeError(_), )); @@ -974,17 +1117,25 @@ mod tests { Protocol::MetaData, Version::V2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Altair, ) .unwrap(), ); - assert!(decode(Protocol::MetaData, Version::V2, &mut encoded_bytes).is_err()); + assert!(decode( + Protocol::MetaData, + Version::V2, + &mut encoded_bytes, + ForkName::Altair + ) + .is_err()); // Sending context bytes which do not correspond to any fork should return an error let mut encoded_bytes = encode( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); @@ -993,7 +1144,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -1002,13 +1159,19 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); let mut part = encoded_bytes.split_to(3); assert_eq!( - decode(Protocol::BlocksByRange, Version::V2, &mut part), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut part, + ForkName::Altair + ), Ok(None) ) } @@ -1064,7 +1227,7 @@ mod tests { // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. assert_eq!( - decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(), + decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), RPCError::InvalidData ); } @@ -1073,7 +1236,7 @@ mod tests { /// sends a valid message filled with a stream of useless padding before the actual message. #[test] fn test_decode_malicious_v2_message() { - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(ForkName::Altair)); // 10 byte snappy stream identifier let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; @@ -1121,7 +1284,13 @@ mod tests { // 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. assert_eq!( - decode(Protocol::BlocksByRange, Version::V2, &mut dst).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut dst, + ForkName::Altair + ) + .unwrap_err(), RPCError::InvalidData ); } @@ -1160,7 +1329,7 @@ mod tests { dst.extend_from_slice(writer.get_ref()); assert_eq!( - decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(), + decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), RPCError::InvalidData ); } From 380415d7623379482d43ee381b45edb4372809b4 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 31 Mar 2022 20:07:30 +0530 Subject: [PATCH 06/10] Fix large block tests --- .../src/rpc/codec/ssz_snappy.rs | 48 +++++++++++++++---- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 443eba143f9..1aa88f3dd48 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -730,6 +730,34 @@ mod tests { Ok(buf) } + fn encode_without_length_checks( + bytes: Vec, + fork_name: ForkName, + ) -> Result { + let fork_context = fork_context(fork_name); + let mut dst = BytesMut::new(); + + // Add context bytes if required + dst.extend_from_slice(&fork_context.to_context_bytes(fork_name).unwrap()); + + let mut uvi_codec: Uvi = Uvi::default(); + + // Inserts the length prefix of the uncompressed bytes into dst + // encoded as a unsigned varint + uvi_codec + .encode(bytes.len(), &mut dst) + .map_err(RPCError::from)?; + + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&bytes).map_err(RPCError::from)?; + writer.flush().map_err(RPCError::from)?; + + // Write compressed bytes to `dst` + dst.extend_from_slice(writer.get_ref()); + + Ok(dst) + } + /// Attempts to decode the given protocol bytes as an rpc response fn decode( protocol: Protocol, @@ -928,13 +956,15 @@ mod tests { )))) ); + let mut encoded = + encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge) + .unwrap(); + assert_eq!( - encode_then_decode( + decode( Protocol::BlocksByRange, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new( - merge_block_large.clone() - ))), + &mut encoded, ForkName::Merge, ) .unwrap_err(), @@ -976,13 +1006,15 @@ mod tests { )))) ); + let mut encoded = + encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge) + .unwrap(); + assert_eq!( - encode_then_decode( + decode( Protocol::BlocksByRoot, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new( - merge_block_large.clone() - ))), + &mut encoded, ForkName::Merge, ) .unwrap_err(), From 3323ed9a097d314964213f69153f52d263cf1881 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 31 Mar 2022 20:17:16 +0530 Subject: [PATCH 07/10] lint --- beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 1aa88f3dd48..0df2fc1a1bf 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -1001,9 +1001,7 @@ mod tests { ))), ForkName::Merge, ), - Ok(Some(RPCResponse::BlocksByRoot(Box::new( - merge_block_small.clone() - )))) + Ok(Some(RPCResponse::BlocksByRoot(Box::new(merge_block_small)))) ); let mut encoded = From 08ee1622db6cfb8791945a571a91fcb26dd0f09e Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 31 Mar 2022 23:18:20 +0530 Subject: [PATCH 08/10] Compiles after rebase --- .../lighthouse_network/src/rpc/codec/ssz_snappy.rs | 12 +++++++----- beacon_node/lighthouse_network/src/rpc/protocol.rs | 13 ++++--------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 0df2fc1a1bf..9a18b3336eb 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -618,7 +618,7 @@ mod tests { use std::sync::Arc; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, ForkContext, - Hash256, Signature, SignedBeaconBlock, Slot, + FullPayload, Hash256, Signature, SignedBeaconBlock, Slot, }; use snap::write::FrameEncoder; @@ -656,11 +656,12 @@ mod tests { /// Merge block with length < max_rpc_size. fn merge_block_small(fork_context: &ForkContext) -> SignedBeaconBlock { - let mut block = BeaconBlockMerge::empty(&Spec::default_spec()); + let mut block: BeaconBlockMerge<_, FullPayload> = + BeaconBlockMerge::empty(&Spec::default_spec()); let tx = VariableList::from(vec![0; 1024]); let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); - block.body.execution_payload.transactions = txs; + block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Merge(block); assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context)); @@ -671,11 +672,12 @@ mod tests { /// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory. /// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. fn merge_block_large(fork_context: &ForkContext) -> SignedBeaconBlock { - let mut block = BeaconBlockMerge::empty(&Spec::default_spec()); + let mut block: BeaconBlockMerge<_, FullPayload> = + BeaconBlockMerge::empty(&Spec::default_spec()); let tx = VariableList::from(vec![0; 1024]); let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::>()); - block.body.execution_payload.transactions = txs; + block.body.execution_payload.execution_payload.transactions = txs; let block = BeaconBlock::Merge(block); assert!(block.ssz_bytes_len() > max_rpc_size(fork_context)); diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 8898d774125..c0431253735 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -66,15 +66,10 @@ lazy_static! { /// Note: This is only the theoretical upper bound. We further bound the max size we receive over the network /// with `MAX_RPC_SIZE_POST_MERGE`. pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = - // Size of a full merge block with an empty execution payload - SignedBeaconBlock::::from_block( - BeaconBlock::Merge(BeaconBlockMerge::::full(&MainnetEthSpec::default_spec())), - Signature::empty(), - ) - .as_ssz_bytes() - .len() - - types::ExecutionPayload::::empty().as_ssz_bytes().len() // subtracting size of empty execution payload included above - + types::ExecutionPayload::::max_execution_payload_size(); // adding max size of execution payload (~16gb) + // Size of a full altair block + *SIGNED_BEACON_BLOCK_ALTAIR_MAX + + types::ExecutionPayload::::max_execution_payload_size() // adding max size of execution payload (~16gb) + + ssz::BYTES_PER_LENGTH_OFFSET; // Adding the additional ssz offset for the `ExecutionPayload` field pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize = VariableList::::from(Vec::::new()) From 6370958bbcdcefc7b3392f5a9917410109e6bee4 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 1 Apr 2022 00:46:06 +0530 Subject: [PATCH 09/10] Add more info to the InvalidData error type --- .../src/peer_manager/mod.rs | 2 +- .../lighthouse_network/src/rpc/codec/base.rs | 20 ++++- .../src/rpc/codec/ssz_snappy.rs | 90 +++++++++++-------- .../lighthouse_network/src/rpc/handler.rs | 2 +- .../lighthouse_network/src/rpc/protocol.rs | 6 +- 5 files changed, 76 insertions(+), 44 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 437d05d4744..de2fdafe63b 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -462,7 +462,7 @@ impl PeerManager { // Our fault. Do nothing return; } - RPCError::InvalidData => { + RPCError::InvalidData(_) => { // Peer is not complying with the protocol. This is considered a malicious action PeerAction::Fatal } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs index c17af4289c3..53f85d9a7b6 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/base.rs @@ -287,7 +287,10 @@ mod tests { max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); let mut min = encode_len(limit.min - 1); let mut codec = SSZSnappyOutboundCodec::::new( @@ -295,7 +298,10 @@ mod tests { max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); // Request limits let limit = protocol_id.rpc_request_limits(); @@ -305,11 +311,17 @@ mod tests { max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); let mut min = encode_len(limit.min - 1); let mut codec = SSZSnappyOutboundCodec::::new(protocol_id, max_rpc_size, fork_context); - assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); } } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 9a18b3336eb..54925c3f592 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -146,7 +146,10 @@ impl Decoder for SSZSnappyInboundCodec { // packet size for ssz container corresponding to `self.protocol`. let ssz_limits = self.protocol.rpc_request_limits(); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC request length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length let max_compressed_len = snap::raw::max_compress_len(length) as u64; @@ -283,7 +286,10 @@ impl Decoder for SSZSnappyOutboundCodec { .protocol .rpc_response_limits::(&self.fork_context); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC response length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length let max_compressed_len = snap::raw::max_compress_len(length) as u64; @@ -329,7 +335,10 @@ impl OutboundCodec> for SSZSnappyOutbound // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of // packet size for ssz container corresponding to `ErrorType`. if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC Error length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length @@ -366,7 +375,10 @@ fn handle_error( // If snappy has read `max_compressed_len` from underlying stream and still can't fill buffer, we have a malicious message. // Report as `InvalidData` so that malicious peer gets banned. if num_bytes >= max_compressed_len { - Err(RPCError::InvalidData) + Err(RPCError::InvalidData(format!( + "Received malicious snappy message, num_bytes {}, max_compressed_len {}", + num_bytes, max_compressed_len + ))) } else { // Haven't received enough bytes to decode yet, wait for more Ok(None) @@ -462,7 +474,9 @@ fn handle_v1_request( // Handle this case just for completeness. Protocol::MetaData => { if !decoded_buffer.is_empty() { - Err(RPCError::InvalidData) + Err(RPCError::InternalError( + "Metadata requests shouldn't reach decoder", + )) } else { Ok(Some(InboundRequest::MetaData(PhantomData))) } @@ -488,7 +502,7 @@ fn handle_v2_request( // Handle this case just for completeness. Protocol::MetaData => { if !decoded_buffer.is_empty() { - Err(RPCError::InvalidData) + Err(RPCError::InvalidData("Metadata request".to_string())) } else { Ok(Some(InboundRequest::MetaData(PhantomData))) } @@ -512,7 +526,9 @@ fn handle_v1_response( decoded_buffer, )?))), // This case should be unreachable as `Goodbye` has no response. - Protocol::Goodbye => Err(RPCError::InvalidData), + Protocol::Goodbye => Err(RPCError::InvalidData( + "Goodbye RPC message has no valid response".to_string(), + )), Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Box::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), @@ -659,7 +675,7 @@ mod tests { let mut block: BeaconBlockMerge<_, FullPayload> = BeaconBlockMerge::empty(&Spec::default_spec()); let tx = VariableList::from(vec![0; 1024]); - let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); + let txs = VariableList::from(std::iter::repeat(tx).take(10000).collect::>()); block.body.execution_payload.execution_payload.transactions = txs; @@ -962,15 +978,17 @@ mod tests { encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge) .unwrap(); - assert_eq!( - decode( - Protocol::BlocksByRange, - Version::V2, - &mut encoded, - ForkName::Merge, - ) - .unwrap_err(), - RPCError::InvalidData, + assert!( + matches!( + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded, + ForkName::Merge, + ) + .unwrap_err(), + RPCError::InvalidData(_) + ), "Decoding a block larger than max_rpc_size should fail" ); @@ -1010,15 +1028,17 @@ mod tests { encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge) .unwrap(); - assert_eq!( - decode( - Protocol::BlocksByRoot, - Version::V2, - &mut encoded, - ForkName::Merge, - ) - .unwrap_err(), - RPCError::InvalidData, + assert!( + matches!( + decode( + Protocol::BlocksByRoot, + Version::V2, + &mut encoded, + ForkName::Merge, + ) + .unwrap_err(), + RPCError::InvalidData(_) + ), "Decoding a block larger than max_rpc_size should fail" ); @@ -1258,10 +1278,10 @@ mod tests { dst.extend_from_slice(writer.get_ref()); // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. - assert_eq!( + assert!(matches!( decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), - RPCError::InvalidData - ); + RPCError::InvalidData(_) + )); } /// Test a malicious snappy encoding for a V2 `BlocksByRange` message where the attacker @@ -1315,7 +1335,7 @@ mod tests { dst.extend_from_slice(writer.get_ref()); // 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. - assert_eq!( + assert!(matches!( decode( Protocol::BlocksByRange, Version::V2, @@ -1323,8 +1343,8 @@ mod tests { ForkName::Altair ) .unwrap_err(), - RPCError::InvalidData - ); + RPCError::InvalidData(_) + )); } /// Test sending a message with encoded length prefix > max_rpc_size. @@ -1360,9 +1380,9 @@ mod tests { writer.flush().unwrap(); dst.extend_from_slice(writer.get_ref()); - assert_eq!( + assert!(matches!( decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), - RPCError::InvalidData - ); + RPCError::InvalidData(_) + )); } } diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 2b9e7c49020..b685c43348a 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -477,7 +477,7 @@ where ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => { // Peer is sending invalid data during the negotiation phase, not // participating in the protocol - RPCError::InvalidData + RPCError::InvalidData("Invalid message during negotiation".to_string()) } }, }; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index c0431253735..94b71d20083 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -533,7 +533,7 @@ pub enum RPCError { /// Stream ended unexpectedly. IncompleteStream, /// Peer sent invalid data. - InvalidData, + InvalidData(String), /// An error occurred due to internal reasons. Ex: timer failure. InternalError(&'static str), /// Negotiation with this peer timed out. @@ -567,7 +567,7 @@ impl std::fmt::Display for RPCError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self { RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err), - RPCError::InvalidData => write!(f, "Peer sent unexpected data"), + RPCError::InvalidData(ref err) => write!(f, "Peer sent unexpected data: {}", err), RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), RPCError::ErrorResponse(ref code, ref reason) => write!( f, @@ -594,7 +594,7 @@ impl std::error::Error for RPCError { RPCError::StreamTimeout => None, RPCError::UnsupportedProtocol => None, RPCError::IncompleteStream => None, - RPCError::InvalidData => None, + RPCError::InvalidData(_) => None, RPCError::InternalError(_) => None, RPCError::ErrorResponse(_, _) => None, RPCError::NegotiationTimeout => None, From f2cf9d3b30761c2f85704245b756383e26febad2 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 1 Apr 2022 00:54:23 +0530 Subject: [PATCH 10/10] Increase small merge block size --- beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs | 2 +- beacon_node/lighthouse_network/tests/rpc_tests.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 54925c3f592..188ae59b6f9 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -675,7 +675,7 @@ mod tests { let mut block: BeaconBlockMerge<_, FullPayload> = BeaconBlockMerge::empty(&Spec::default_spec()); let tx = VariableList::from(vec![0; 1024]); - let txs = VariableList::from(std::iter::repeat(tx).take(10000).collect::>()); + let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::>()); block.body.execution_payload.execution_payload.transactions = txs; diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index f9db2962606..5895d32d5dc 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -23,7 +23,7 @@ type E = MinimalEthSpec; fn merge_block_small(fork_context: &ForkContext) -> BeaconBlock { let mut block = BeaconBlockMerge::::empty(&E::default_spec()); let tx = VariableList::from(vec![0; 1024]); - let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); + let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::>()); block.body.execution_payload.execution_payload.transactions = txs;