diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java index bed9ac22b3f..cb77d13a627 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java @@ -14,9 +14,11 @@ package tech.pegasys.teku.beacon.sync.forward.multipeer.chains; import static tech.pegasys.teku.spec.config.Constants.MAX_BLOBS_SIDECARS_PER_MINUTE; +import static tech.pegasys.teku.spec.config.Constants.MAX_BLOB_SIDECARS_PER_MINUTE; import static tech.pegasys.teku.spec.config.Constants.MAX_BLOCKS_PER_MINUTE; import static tech.pegasys.teku.spec.config.Constants.SYNC_BATCH_SIZE; import static tech.pegasys.teku.spec.config.Constants.SYNC_BLOBS_SIDECARS_SIZE; +import static tech.pegasys.teku.spec.config.Constants.SYNC_BLOB_SIDECARS_SIZE; import java.util.HashMap; import java.util.Map; @@ -41,11 +43,18 @@ public SyncSource getOrCreateSyncSource(final Eth2Peer peer) { final int maxBlocksPerMinute = MAX_BLOCKS_PER_MINUTE - SYNC_BATCH_SIZE.intValue() - 1; final int maxBlobsSidecarsPerMinute = MAX_BLOBS_SIDECARS_PER_MINUTE - SYNC_BLOBS_SIDECARS_SIZE.intValue() - 1; + final int maxBlobSidecarsPerMinute = + MAX_BLOB_SIDECARS_PER_MINUTE - SYNC_BLOB_SIDECARS_SIZE.intValue() - 1; return syncSourcesByPeer.computeIfAbsent( peer, source -> new ThrottlingSyncSource( - asyncRunner, timeProvider, source, maxBlocksPerMinute, maxBlobsSidecarsPerMinute)); + asyncRunner, + timeProvider, + source, + maxBlocksPerMinute, + maxBlobsSidecarsPerMinute, + maxBlobSidecarsPerMinute)); } public void onPeerDisconnected(final Eth2Peer peer) { diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java index c3cfe94615c..fd0332f3b11 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java @@ -26,26 +26,34 @@ import tech.pegasys.teku.networking.p2p.reputation.ReputationAdjustment; import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; public class ThrottlingSyncSource implements SyncSource { private static final Logger LOG = LogManager.getLogger(); + private static final long TIME_OUT = 60; + public static final Duration PEER_REQUEST_DELAY = Duration.ofSeconds(3); private final AsyncRunner asyncRunner; private final SyncSource delegate; private final RateTracker blocksRateTracker; private final RateTracker blobsSidecarsRateTracker; + private final RateTracker blobSidecarsRateTracker; public ThrottlingSyncSource( final AsyncRunner asyncRunner, final TimeProvider timeProvider, final SyncSource delegate, final int maxBlocksPerMinute, - final int maxBlobsSidecarsPerMinute) { + final int maxBlobsSidecarsPerMinute, + final int maxBlobSidecarsPerMinute) { this.asyncRunner = asyncRunner; this.delegate = delegate; - this.blocksRateTracker = new RateTracker(maxBlocksPerMinute, 60, timeProvider); - this.blobsSidecarsRateTracker = new RateTracker(maxBlobsSidecarsPerMinute, 60, timeProvider); + this.blocksRateTracker = new RateTracker(maxBlocksPerMinute, TIME_OUT, timeProvider); + this.blobsSidecarsRateTracker = + new RateTracker(maxBlobsSidecarsPerMinute, TIME_OUT, timeProvider); + this.blobSidecarsRateTracker = + new RateTracker(maxBlobSidecarsPerMinute, TIME_OUT, timeProvider); } @Override @@ -58,7 +66,7 @@ public SafeFuture requestBlocksByRange( return delegate.requestBlocksByRange(startSlot, count, listener); } else { return asyncRunner.runAfterDelay( - () -> requestBlocksByRange(startSlot, count, listener), Duration.ofSeconds(3)); + () -> requestBlocksByRange(startSlot, count, listener), PEER_REQUEST_DELAY); } } @@ -72,7 +80,19 @@ public SafeFuture requestBlobsSidecarsByRange( return delegate.requestBlobsSidecarsByRange(startSlot, count, listener); } else { return asyncRunner.runAfterDelay( - () -> requestBlobsSidecarsByRange(startSlot, count, listener), Duration.ofSeconds(3)); + () -> requestBlobsSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY); + } + } + + @Override + public SafeFuture requestBlobSidecarsByRange( + final UInt64 startSlot, final UInt64 count, final RpcResponseListener listener) { + if (blobSidecarsRateTracker.wantToRequestObjects(count.longValue()) > 0) { + LOG.debug("Sending request for {} blob sidecars", count); + return delegate.requestBlobSidecarsByRange(startSlot, count, listener); + } else { + return asyncRunner.runAfterDelay( + () -> requestBlobSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY); } } diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSourceTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSourceTest.java index 4725e12011f..3fbcef70987 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSourceTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSourceTest.java @@ -29,12 +29,14 @@ import tech.pegasys.teku.networking.p2p.peer.DisconnectReason; import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; class ThrottlingSyncSourceTest { private static final int MAX_BLOCKS_PER_MINUTE = 100; private static final int MAX_BLOBS_SIDECARS_PER_MINUTE = 100; + private static final int MAX_BLOB_SIDECARS_PER_MINUTE = 100; private final StubAsyncRunner asyncRunner = new StubAsyncRunner(); private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0); private final SyncSource delegate = mock(SyncSource.class); @@ -47,13 +49,18 @@ class ThrottlingSyncSourceTest { private final RpcResponseListener blobsSidecarsListener = mock(RpcResponseListener.class); + @SuppressWarnings("unchecked") + private final RpcResponseListener blobSidecarsListener = + mock(RpcResponseListener.class); + private final ThrottlingSyncSource source = new ThrottlingSyncSource( asyncRunner, timeProvider, delegate, MAX_BLOCKS_PER_MINUTE, - MAX_BLOBS_SIDECARS_PER_MINUTE); + MAX_BLOBS_SIDECARS_PER_MINUTE, + MAX_BLOB_SIDECARS_PER_MINUTE); @Test void shouldDelegateDisconnectImmediately() { @@ -93,6 +100,21 @@ void shouldRequestBlobsSidecarsImmediatelyIfRateLimitNotExceeded() { .requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener)); } + @Test + void shouldRequestBlobSidecarsImmediatelyIfRateLimitNotExceeded() { + final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE - 1); + ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + ignoreFuture( + source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + + // Both requests happen immediately + ignoreFuture( + verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + ignoreFuture( + verify(delegate) + .requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + } + @Test void shouldDelayRequestIfBlockLimitAlreadyExceeded() { final UInt64 count = UInt64.valueOf(MAX_BLOCKS_PER_MINUTE); @@ -129,6 +151,25 @@ void shouldDelayRequestIfBlobsSidecarsLimitAlreadyExceeded() { .requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener)); } + @Test + void shouldDelayRequestIfBlobSidecarsLimitAlreadyExceeded() { + final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE); + ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + ignoreFuture( + source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + + ignoreFuture( + verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + verifyNoMoreInteractions(delegate); + + timeProvider.advanceTimeBySeconds(61); + asyncRunner.executeQueuedActions(); + + ignoreFuture( + verify(delegate) + .requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + } + @Test void shouldContinueDelayingBlocksRequestIfRequestStillExceeded() { final UInt64 count = UInt64.valueOf(MAX_BLOCKS_PER_MINUTE); @@ -170,4 +211,27 @@ void shouldContinueDelayingBlobsSidecarsRequestIfRequestStillExceeded() { verify(delegate) .requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener)); } + + @Test + void shouldContinueDelayingBlobSidecarsRequestIfRequestStillExceeded() { + final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE); + ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + ignoreFuture( + source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + + // Both requests happen immediately + ignoreFuture( + verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + verifyNoMoreInteractions(delegate); + + timeProvider.advanceTimeBySeconds(30); + asyncRunner.executeQueuedActions(); + verifyNoMoreInteractions(delegate); + + timeProvider.advanceTimeBySeconds(31); + asyncRunner.executeQueuedActions(); + ignoreFuture( + verify(delegate) + .requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/config/Constants.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/config/Constants.java index 8dac1d5aff1..77d2afb1efc 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/config/Constants.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/config/Constants.java @@ -34,6 +34,7 @@ public class Constants { public static final UInt64 MAX_REQUEST_BLOB_SIDECARS = UInt64.valueOf(128); public static final int MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS = 4096; + public static final int MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS = 4096; // Teku Networking Specific public static final int VALID_BLOCK_SET_SIZE = 1000; @@ -73,6 +74,7 @@ public class Constants { public static final UInt64 SYNC_BLOB_SIDECARS_SIZE = UInt64.valueOf(50); public static final int MAX_BLOCKS_PER_MINUTE = 500; public static final int MAX_BLOBS_SIDECARS_PER_MINUTE = 500; + public static final int MAX_BLOB_SIDECARS_PER_MINUTE = 1000; // Teku Validator Client Specific public static final Duration GENESIS_DATA_RETRY_DELAY = Duration.ofSeconds(10); diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlockAndBlobsSidecarByRootRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlockAndBlobsSidecarByRootRequestMessage.java index b68fb239f5d..84b7ec64d82 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlockAndBlobsSidecarByRootRequestMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlockAndBlobsSidecarByRootRequestMessage.java @@ -57,7 +57,7 @@ private BeaconBlockAndBlobsSidecarByRootRequestMessage(final TreeNode node) { } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return size(); } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlocksByRangeRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlocksByRangeRequestMessage.java index a01b13e94f9..bb09fcb5f1d 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlocksByRangeRequestMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlocksByRangeRequestMessage.java @@ -71,7 +71,7 @@ public UInt64 getMaxSlot() { } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return Math.toIntExact(getCount().longValue()); } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlocksByRootRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlocksByRootRequestMessage.java index ccf530218b0..5de109e5546 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlocksByRootRequestMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BeaconBlocksByRootRequestMessage.java @@ -56,7 +56,7 @@ private BeaconBlocksByRootRequestMessage(TreeNode node) { } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return size(); } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRangeRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRangeRequestMessage.java new file mode 100644 index 00000000000..e0ee1071b49 --- /dev/null +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRangeRequestMessage.java @@ -0,0 +1,72 @@ +/* + * Copyright ConsenSys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc; + +import tech.pegasys.teku.infrastructure.ssz.containers.Container2; +import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2; +import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64; +import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas; +import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; + +public class BlobSidecarsByRangeRequestMessage + extends Container2 + implements RpcRequest { + + public static class BlobSidecarsByRangeRequestMessageSchema + extends ContainerSchema2 { + + public BlobSidecarsByRangeRequestMessageSchema() { + super( + "BlobSidecarsByRangeRequestMessage", + namedSchema("start_slot", SszPrimitiveSchemas.UINT64_SCHEMA), + namedSchema("count", SszPrimitiveSchemas.UINT64_SCHEMA)); + } + + @Override + public BlobSidecarsByRangeRequestMessage createFromBackingNode(final TreeNode node) { + return new BlobSidecarsByRangeRequestMessage(this, node); + } + } + + public static final BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema + SSZ_SCHEMA = new BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema(); + + private BlobSidecarsByRangeRequestMessage( + final BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema type, + final TreeNode backingNode) { + super(type, backingNode); + } + + public BlobSidecarsByRangeRequestMessage(final UInt64 startSlot, final UInt64 count) { + super(SSZ_SCHEMA, SszUInt64.of(startSlot), SszUInt64.of(count)); + } + + public UInt64 getStartSlot() { + return getField0().get(); + } + + public UInt64 getCount() { + return getField1().get(); + } + + public UInt64 getMaxSlot() { + return getStartSlot().plus(getCount()).minusMinZero(1); + } + + @Override + public int getMaximumResponseChunks() { + return getCount().intValue(); + } +} diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRootRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRootRequestMessage.java index 110107bd098..be0c871c49d 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRootRequestMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRootRequestMessage.java @@ -55,7 +55,7 @@ private BlobSidecarsByRootRequestMessage( } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return size(); } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobsSidecarsByRangeRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobsSidecarsByRangeRequestMessage.java index 2e5402c8171..1fa52702c6e 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobsSidecarsByRangeRequestMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobsSidecarsByRangeRequestMessage.java @@ -61,11 +61,11 @@ public UInt64 getCount() { } public UInt64 getMaxSlot() { - return getStartSlot().plus(getCount().minus(1)); + return getStartSlot().plus(getCount()).minusMinZero(1); } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return getCount().intValue(); } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/EmptyMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/EmptyMessage.java index 436d275004f..609e392bebc 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/EmptyMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/EmptyMessage.java @@ -45,7 +45,7 @@ public String toString() { } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return 1; } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/GoodbyeMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/GoodbyeMessage.java index 660f9060376..9adbcb9896e 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/GoodbyeMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/GoodbyeMessage.java @@ -68,7 +68,7 @@ public UInt64 getReason() { } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return 0; } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/PingMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/PingMessage.java index 43257c2ebed..5ba2204ddd8 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/PingMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/PingMessage.java @@ -53,7 +53,7 @@ public UInt64 getSeqNumber() { } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return 1; } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/RpcRequest.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/RpcRequest.java index 539bfd7dcb7..bdacaed02aa 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/RpcRequest.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/RpcRequest.java @@ -17,5 +17,5 @@ public interface RpcRequest extends SszData { - int getMaximumRequestChunks(); + int getMaximumResponseChunks(); } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/StatusMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/StatusMessage.java index 52dd39dd4a7..c7bd957cdbb 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/StatusMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/StatusMessage.java @@ -104,7 +104,7 @@ public UInt64 getHeadSlot() { } @Override - public int getMaximumRequestChunks() { + public int getMaximumResponseChunks() { return 1; } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/metadata/MetadataMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/metadata/MetadataMessage.java index ea9f3d1e87b..c03a2d8a6d9 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/metadata/MetadataMessage.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/metadata/MetadataMessage.java @@ -25,7 +25,7 @@ public interface MetadataMessage extends SszContainer, RpcRequest { SszBitvector getAttnets(); @Override - default int getMaximumRequestChunks() { + default int getMaximumResponseChunks() { return 1; } diff --git a/ethereum/spec/src/test/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRangeRequestMessageTest.java b/ethereum/spec/src/test/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRangeRequestMessageTest.java new file mode 100644 index 00000000000..8d9ca29e062 --- /dev/null +++ b/ethereum/spec/src/test/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRangeRequestMessageTest.java @@ -0,0 +1,58 @@ +/* + * Copyright ConsenSys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static tech.pegasys.teku.infrastructure.ssz.SszDataAssert.assertThatSszData; + +import java.util.stream.Stream; +import org.apache.tuweni.bytes.Bytes; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; + +public class BlobSidecarsByRangeRequestMessageTest { + + @Test + public void shouldRoundTripViaSsz() { + final BlobSidecarsByRangeRequestMessage request = + new BlobSidecarsByRangeRequestMessage(UInt64.valueOf(2), UInt64.valueOf(3)); + final Bytes data = request.sszSerialize(); + final BlobSidecarsByRangeRequestMessage result = + BlobSidecarsByRangeRequestMessage.SSZ_SCHEMA.sszDeserialize(data); + + assertThatSszData(result).isEqualByAllMeansTo(request); + } + + @ParameterizedTest(name = "startSlot={0}, count={1}") + @MethodSource("getMaxSlotParams") + public void getMaxSlot(final long startSlot, final long count, final long expected) { + final BlobSidecarsByRangeRequestMessage request = + new BlobSidecarsByRangeRequestMessage(UInt64.valueOf(startSlot), UInt64.valueOf(count)); + + assertThat(request.getMaxSlot()).isEqualTo(UInt64.valueOf(expected)); + } + + public static Stream getMaxSlotParams() { + return Stream.of( + Arguments.of(0, 1, 0), + Arguments.of(111, 1, 111), + Arguments.of(0, 2, 1), + Arguments.of(10, 2, 11), + Arguments.of(0, 5, 4), + Arguments.of(10, 5, 14)); + } +} diff --git a/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/generator/ChainBuilder.java b/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/generator/ChainBuilder.java index dfaf336938f..295bdc1b940 100644 --- a/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/generator/ChainBuilder.java +++ b/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/generator/ChainBuilder.java @@ -51,6 +51,7 @@ import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayload; import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadHeader; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.Blob; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecarSchema; import tech.pegasys.teku.spec.datastructures.interop.GenesisStateBuilder; @@ -88,8 +89,10 @@ public class ChainBuilder { private final AttesterSlashingGenerator attesterSlashingGenerator; private final NavigableMap blocks = new TreeMap<>(); private final NavigableMap blobsSidecars = new TreeMap<>(); + private final NavigableMap> blobSidecars = new TreeMap<>(); private final Map blocksByHash = new HashMap<>(); private final Map blobsSidecarsByHash = new HashMap<>(); + private final Map> blobSidecarsByHash = new HashMap<>(); private final BlockProposalTestUtil blockProposalTestUtil; private final BlobsUtil blobsUtil; @@ -97,7 +100,8 @@ private ChainBuilder( final Spec spec, final List validatorKeys, final Map existingBlocks, - final Map existingBlobsSidecars) { + final Map existingBlobsSidecars, + final Map> existingBlobSidecars) { this.spec = spec; this.validatorKeys = validatorKeys; this.blobsUtil = new BlobsUtil(spec); @@ -106,8 +110,17 @@ private ChainBuilder( blockProposalTestUtil = new BlockProposalTestUtil(spec); blocks.putAll(existingBlocks); blobsSidecars.putAll(existingBlobsSidecars); + blobSidecars.putAll(existingBlobSidecars); existingBlocks.values().forEach(b -> blocksByHash.put(b.getRoot(), b)); blobsSidecars.values().forEach(b -> blobsSidecarsByHash.put(b.getBeaconBlockRoot(), b)); + blobSidecars + .values() + .forEach( + b -> { + if (!b.isEmpty()) { + blobSidecarsByHash.put(b.get(0).getBlockRoot(), b); + } + }); } public static ChainBuilder create(final Spec spec) { @@ -115,7 +128,12 @@ public static ChainBuilder create(final Spec spec) { } public static ChainBuilder create(final Spec spec, final List validatorKeys) { - return new ChainBuilder(spec, validatorKeys, Collections.emptyMap(), Collections.emptyMap()); + return new ChainBuilder( + spec, + validatorKeys, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap()); } public Optional getBlock(final Bytes32 blockRoot) { @@ -130,6 +148,10 @@ public Optional getBlobsSidecar(final Bytes32 blockRoot) { return Optional.ofNullable(blobsSidecarsByHash.get(blockRoot)); } + public Optional> getBlobSidecars(final Bytes32 blockRoot) { + return Optional.ofNullable(blobSidecarsByHash.get(blockRoot)); + } + public Optional getBlockAndBlobsSidecar( final Bytes32 blockRoot) { return getBlock(blockRoot) @@ -157,7 +179,7 @@ public Optional getBlockAndBlobsSidecar( * @return An independent copy of this ChainBuilder */ public ChainBuilder fork() { - return new ChainBuilder(spec, validatorKeys, blocks, blobsSidecars); + return new ChainBuilder(spec, validatorKeys, blocks, blobsSidecars, blobSidecars); } public List getValidatorKeys() { @@ -210,12 +232,24 @@ public Stream streamBlobsSidecars(final long fromSlot, final long return streamBlobsSidecars(UInt64.valueOf(fromSlot), UInt64.valueOf(toSlot)); } + public Stream streamBlobSidecars(final long fromSlot, final long toSlot) { + return streamBlobSidecars(UInt64.valueOf(fromSlot), UInt64.valueOf(toSlot)); + } + public Stream streamBlobsSidecars(final UInt64 fromSlot, final UInt64 toSlot) { return blobsSidecars.values().stream() .filter(s -> s.getBeaconBlockSlot().isGreaterThanOrEqualTo(fromSlot)) .filter(s -> s.getBeaconBlockSlot().isLessThanOrEqualTo(toSlot)); } + public Stream streamBlobSidecars(final UInt64 fromSlot, final UInt64 toSlot) { + return blobSidecars.values().stream() + .filter(blobs -> !blobs.isEmpty()) + .filter(blobs -> blobs.get(0).getSlot().isGreaterThanOrEqualTo(fromSlot)) + .filter(blobs -> blobs.get(0).getSlot().isLessThanOrEqualTo(toSlot)) + .flatMap(List::stream); + } + public Stream streamBlobsSidecars() { return blobsSidecars.values().stream(); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java index 1790b3ddacd..1ede9136357 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java @@ -56,6 +56,7 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage; @@ -82,8 +83,9 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer { private final AtomicInteger unansweredPings = new AtomicInteger(); private final RateTracker blockRequestTracker; private final RateTracker blobsSidecarsRequestTracker; + private final RateTracker blobSidecarsRequestTracker; private final RateTracker requestTracker; - private final Supplier firstSlotSupportingBlobsSidecarsByRange; + private final Supplier firstSlotSupportingBlobSidecarsByRange; DefaultEth2Peer( final Spec spec, @@ -94,6 +96,7 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer { final PeerChainValidator peerChainValidator, final RateTracker blockRequestTracker, final RateTracker blobsSidecarsRequestTracker, + final RateTracker blobSidecarsRequestTracker, final RateTracker requestTracker) { super(peer); this.rpcMethods = rpcMethods; @@ -102,8 +105,9 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer { this.peerChainValidator = peerChainValidator; this.blockRequestTracker = blockRequestTracker; this.blobsSidecarsRequestTracker = blobsSidecarsRequestTracker; + this.blobSidecarsRequestTracker = blobSidecarsRequestTracker; this.requestTracker = requestTracker; - this.firstSlotSupportingBlobsSidecarsByRange = + this.firstSlotSupportingBlobSidecarsByRange = Suppliers.memoize( () -> { final UInt64 denebForkEpoch = @@ -325,7 +329,7 @@ public SafeFuture requestBlobsSidecarsByRange( .blobsSidecarsByRange() .map( method -> { - final UInt64 firstSupportedSlot = firstSlotSupportingBlobsSidecarsByRange.get(); + final UInt64 firstSupportedSlot = firstSlotSupportingBlobSidecarsByRange.get(); final BlobsSidecarsByRangeRequestMessage request; if (startSlot.isLessThan(firstSupportedSlot)) { LOG.debug( @@ -343,6 +347,36 @@ public SafeFuture requestBlobsSidecarsByRange( .orElse(failWithUnsupportedMethodException("BlobsSidecarsByRange")); } + @Override + public SafeFuture requestBlobSidecarsByRange( + final UInt64 startSlot, final UInt64 count, final RpcResponseListener listener) { + return rpcMethods + .blobSidecarsByRange() + .map( + method -> { + final UInt64 firstSupportedSlot = firstSlotSupportingBlobSidecarsByRange.get(); + final BlobSidecarsByRangeRequestMessage request; + if (startSlot.isLessThan(firstSupportedSlot)) { + LOG.debug( + "Requesting blob sidecars from slot {} instead of slot {} because the request is spanning the Deneb fork transition", + firstSupportedSlot, + startSlot); + final UInt64 updatedCount = + count.minusMinZero(firstSupportedSlot.minusMinZero(startSlot)); + if (updatedCount.isZero()) { + return SafeFuture.COMPLETE; + } + request = new BlobSidecarsByRangeRequestMessage(firstSupportedSlot, updatedCount); + } else { + request = new BlobSidecarsByRangeRequestMessage(startSlot, count); + } + return requestStream(method, request, listener); + }) + .orElse( + SafeFuture.failedFuture( + new UnsupportedOperationException("BlobSidecarsByRange method is not available"))); + } + @Override public SafeFuture requestMetadata() { return requestSingleItem(rpcMethods.getMetadata(), EmptyMessage.EMPTY_MESSAGE); @@ -371,7 +405,8 @@ public boolean wantToReceiveBlobsSidecars( @Override public boolean wantToReceiveBlobSidecars( final ResponseCallback callback, final long blobSidecarsCount) { - throw new UnsupportedOperationException("Not yet implemented"); + return wantToReceiveObjects( + "blob sidecars", blobSidecarsRequestTracker, callback, blobSidecarsCount); } @Override diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java index b0bd502c689..0961ff67fd0 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java @@ -48,6 +48,7 @@ static Eth2Peer create( final PeerChainValidator peerChainValidator, final RateTracker blockRequestTracker, final RateTracker blobsSidecarsRequestTracker, + final RateTracker blobSidecarsRequestTracker, final RateTracker requestTracker) { return new DefaultEth2Peer( spec, @@ -58,6 +59,7 @@ static Eth2Peer create( peerChainValidator, blockRequestTracker, blobsSidecarsRequestTracker, + blobSidecarsRequestTracker, requestTracker); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerFactory.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerFactory.java index ee9366be2c9..3b1151265e0 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerFactory.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerFactory.java @@ -26,6 +26,7 @@ public class Eth2PeerFactory { + private static final long TIME_OUT = 60; private final Spec spec; private final StatusMessageFactory statusMessageFactory; private final MetadataMessagesFactory metadataMessagesFactory; @@ -65,8 +66,9 @@ public Eth2Peer create(final Peer peer, final BeaconChainMethods rpcMethods) { statusMessageFactory, metadataMessagesFactory, PeerChainValidator.create(spec, metricsSystem, chainDataClient, requiredCheckpoint), - new RateTracker(peerRateLimit, 60, timeProvider), - new RateTracker(peerRateLimit, 60, timeProvider), - new RateTracker(peerRequestLimit, 60, timeProvider)); + new RateTracker(peerRateLimit, TIME_OUT, timeProvider), + new RateTracker(peerRateLimit, TIME_OUT, timeProvider), + new RateTracker(peerRateLimit, TIME_OUT, timeProvider), + new RateTracker(peerRequestLimit, TIME_OUT, timeProvider)); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/SyncSource.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/SyncSource.java index 0f669b775d8..bdad6cc9eeb 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/SyncSource.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/SyncSource.java @@ -19,6 +19,7 @@ import tech.pegasys.teku.networking.p2p.reputation.ReputationAdjustment; import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; /** @@ -32,6 +33,9 @@ SafeFuture requestBlocksByRange( SafeFuture requestBlobsSidecarsByRange( UInt64 startSlot, UInt64 count, RpcResponseListener listener); + SafeFuture requestBlobSidecarsByRange( + UInt64 startSlot, UInt64 count, RpcResponseListener listener); + void adjustReputation(final ReputationAdjustment adjustment); SafeFuture disconnectCleanly(DisconnectReason reason); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java index b199b50b532..3a0a3fb43f5 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java @@ -25,6 +25,7 @@ public class BeaconChainMethodIds { static final String BEACON_BLOCK_AND_BLOBS_SIDECAR_BY_ROOT = "/eth2/beacon_chain/req/beacon_block_and_blobs_sidecar_by_root"; static final String BLOBS_SIDECARS_BY_RANGE = "/eth2/beacon_chain/req/blobs_sidecars_by_range"; + static final String BLOB_SIDECARS_BY_RANGE = "/eth2/beacon_chain/req/blob_sidecars_by_range"; static final String GET_METADATA = "/eth2/beacon_chain/req/metadata"; static final String PING = "/eth2/beacon_chain/req/ping"; @@ -56,6 +57,11 @@ public static String getBlobsSidecarsByRangeMethodId( return getMethodId(BLOBS_SIDECARS_BY_RANGE, version, encoding); } + public static String getBlobSidecarsByRangeMethodId( + final int version, final RpcEncoding encoding) { + return getMethodId(BLOB_SIDECARS_BY_RANGE, version, encoding); + } + public static String getStatusMethodId(final int version, final RpcEncoding encoding) { return getMethodId(STATUS, version, encoding); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java index 0c36bb11f89..adc93f1cb28 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java @@ -15,6 +15,7 @@ import static tech.pegasys.teku.spec.config.Constants.MAX_BLOCK_BY_RANGE_REQUEST_SIZE; import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOBS_SIDECARS; +import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOB_SIDECARS; import java.util.ArrayList; import java.util.Collection; @@ -30,6 +31,7 @@ import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlockAndBlobsSidecarByRootMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlocksByRangeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlocksByRootMessageHandler; +import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobSidecarsByRangeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobSidecarsByRootMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobsSidecarsByRangeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.GoodbyeMessageHandler; @@ -47,6 +49,7 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcMethod; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; @@ -57,6 +60,7 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage.BeaconBlocksByRangeRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage.BeaconBlocksByRootRequestMessageSchema; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage.BlobsSidecarsByRangeRequestMessageSchema; @@ -83,6 +87,8 @@ public class BeaconChainMethods { beaconBlockAndBlobsSidecarByRoot; private final Optional> blobsSidecarsByRange; + private final Optional> + blobSidecarsByRange; private final Optional> blobSidecarsByRoot; private final Eth2RpcMethod getMetadata; @@ -103,6 +109,8 @@ private BeaconChainMethods( blobsSidecarsByRange, final Optional> blobSidecarsByRoot, + final Optional> + blobSidecarsByRange, final Eth2RpcMethod getMetadata, final Eth2RpcMethod ping) { this.status = status; @@ -112,6 +120,7 @@ private BeaconChainMethods( this.beaconBlockAndBlobsSidecarByRoot = beaconBlockAndBlobsSidecarByRoot; this.blobsSidecarsByRange = blobsSidecarsByRange; this.blobSidecarsByRoot = blobSidecarsByRoot; + this.blobSidecarsByRange = blobSidecarsByRange; this.getMetadata = getMetadata; this.ping = ping; this.allMethods = @@ -120,6 +129,7 @@ private BeaconChainMethods( blobSidecarsByRoot.ifPresent(allMethods::add); beaconBlockAndBlobsSidecarByRoot().ifPresent(allMethods::add); blobsSidecarsByRange.ifPresent(allMethods::add); + blobSidecarsByRange.ifPresent(allMethods::add); } public static BeaconChainMethods create( @@ -163,6 +173,14 @@ public static BeaconChainMethods create( peerLookup, rpcEncoding, recentChainData), + createBlobSidecarsByRange( + spec, + metricsSystem, + asyncRunner, + combinedChainDataClient, + peerLookup, + rpcEncoding, + recentChainData), createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding), createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding)); } @@ -393,6 +411,52 @@ private static Eth2RpcMethod createGoodBye( peerLookup)); } + private static Optional> + createBlobSidecarsByRange( + final Spec spec, + final MetricsSystem metricsSystem, + final AsyncRunner asyncRunner, + final CombinedChainDataClient combinedChainDataClient, + final PeerLookup peerLookup, + final RpcEncoding rpcEncoding, + final RecentChainData recentChainData) { + + if (!spec.isMilestoneSupported(SpecMilestone.DENEB)) { + return Optional.empty(); + } + + final BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema requestType = + BlobSidecarsByRangeRequestMessage.SSZ_SCHEMA; + + final RpcContextCodec forkDigestContextCodec = + RpcContextCodec.forkDigest(spec, recentChainData, ForkDigestPayloadContext.BLOB_SIDECAR); + + final int maxBlobsPerBlock = + SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig()) + .getMaxBlobsPerBlock(); + + final BlobSidecarsByRangeMessageHandler blobSidecarsByRangeHandler = + new BlobSidecarsByRangeMessageHandler( + spec, + getDenebForkEpoch(spec), + metricsSystem, + combinedChainDataClient, + MAX_REQUEST_BLOB_SIDECARS.times(maxBlobsPerBlock), + UInt64.valueOf(maxBlobsPerBlock)); + + return Optional.of( + new SingleProtocolEth2RpcMethod<>( + asyncRunner, + BeaconChainMethodIds.BLOB_SIDECARS_BY_RANGE, + 1, + rpcEncoding, + requestType, + true, + forkDigestContextCodec, + blobSidecarsByRangeHandler, + peerLookup)); + } + private static Eth2RpcMethod createMetadata( final Spec spec, final AsyncRunner asyncRunner, @@ -517,6 +581,11 @@ public Eth2RpcMethod beaco return blobSidecarsByRoot; } + public Optional> + blobSidecarsByRange() { + return blobSidecarsByRange; + } + public Eth2RpcMethod getMetadata() { return getMetadata; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRangeMessageHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRangeMessageHandler.java new file mode 100644 index 00000000000..f81386a162d --- /dev/null +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRangeMessageHandler.java @@ -0,0 +1,283 @@ +/* + * Copyright ConsenSys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods; + +import static tech.pegasys.teku.spec.config.Constants.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import java.nio.channels.ClosedChannelException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.Counter; +import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; +import org.jetbrains.annotations.NotNull; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; +import tech.pegasys.teku.networking.eth2.rpc.core.PeerRequiredLocalMessageHandler; +import tech.pegasys.teku.networking.eth2.rpc.core.ResponseCallback; +import tech.pegasys.teku.networking.eth2.rpc.core.RpcException; +import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRangeRequestMessage; +import tech.pegasys.teku.storage.client.CombinedChainDataClient; + +public class BlobSidecarsByRangeMessageHandler + extends PeerRequiredLocalMessageHandler { + + private static final Logger LOG = LogManager.getLogger(); + + private final Spec spec; + private final UInt64 denebForkEpoch; + private final CombinedChainDataClient combinedChainDataClient; + private final UInt64 maxRequestSize; + private final UInt64 maxBlobsPerBlock; + private final LabelledMetric requestCounter; + private final Counter totalBlobSidecarsRequestedCounter; + + public BlobSidecarsByRangeMessageHandler( + final Spec spec, + final UInt64 denebForkEpoch, + final MetricsSystem metricsSystem, + final CombinedChainDataClient combinedChainDataClient, + final UInt64 maxRequestSize, + final UInt64 maxBlobsPerBlock) { + this.spec = spec; + this.denebForkEpoch = denebForkEpoch; + this.combinedChainDataClient = combinedChainDataClient; + this.maxRequestSize = maxRequestSize; + this.maxBlobsPerBlock = maxBlobsPerBlock; + requestCounter = + metricsSystem.createLabelledCounter( + TekuMetricCategory.NETWORK, + "rpc_blob_sidecars_by_range_requests_total", + "Total number of blob sidecars by range requests received", + "status"); + totalBlobSidecarsRequestedCounter = + metricsSystem.createCounter( + TekuMetricCategory.NETWORK, + "rpc_blob_sidecars_by_range_requested_sidecars_total", + "Total number of blob sidecars requested in accepted blob sidecars by range requests from peers"); + } + + @Override + public void onIncomingMessage( + final String protocolId, + final Eth2Peer peer, + final BlobSidecarsByRangeRequestMessage message, + final ResponseCallback callback) { + final UInt64 startSlot = message.getStartSlot(); + LOG.trace( + "Peer {} requested {} blob sidecars starting at slot {}.", + peer.getId(), + message.getCount(), + startSlot); + + if (!peer.wantToMakeRequest() + || !peer.wantToReceiveBlobSidecars( + callback, maxRequestSize.min(message.getCount()).longValue())) { + requestCounter.labels("rate_limited").inc(); + return; + } + requestCounter.labels("ok").inc(); + totalBlobSidecarsRequestedCounter.inc(message.getCount().longValue()); + + final Bytes32 headBlockRoot = + combinedChainDataClient + .getBestBlockRoot() + .orElseThrow( + () -> + new IllegalStateException( + "Can't retrieve the block root chosen by fork choice.")); + + combinedChainDataClient + .getEarliestAvailableBlobSidecarEpoch() + .thenCompose( + earliestAvailableEpoch -> { + final UInt64 requestEpoch = spec.computeEpochAtSlot(startSlot); + if (checkRequestInMinEpochsRange(requestEpoch) + && !checkBlobSidecarsAreAvailable(earliestAvailableEpoch, requestEpoch)) { + return SafeFuture.failedFuture( + new RpcException.ResourceUnavailableException( + "Requested blob sidecars are not available.")); + } + final BlobSidecarsByRangeMessageHandler.RequestState initialState = + new BlobSidecarsByRangeMessageHandler.RequestState( + callback, headBlockRoot, startSlot, message.getMaxSlot()); + if (initialState.isComplete()) { + return SafeFuture.completedFuture(initialState); + } + return sendBlobSidecars(initialState); + }) + .finish( + requestState -> { + final int sentBlobSidecars = requestState.sentBlobSidecars.get(); + LOG.trace("Sent {} blob sidecars to peer {}.", sentBlobSidecars, peer.getId()); + callback.completeSuccessfully(); + }, + error -> handleProcessingRequestError(error, callback)); + } + + private boolean checkBlobSidecarsAreAvailable( + final Optional earliestAvailableSidecarEpoch, final UInt64 requestEpoch) { + return earliestAvailableSidecarEpoch + .map(earliestEpoch -> earliestEpoch.isLessThanOrEqualTo(requestEpoch)) + .orElse(false); + } + + private boolean checkRequestInMinEpochsRange(final UInt64 requestEpoch) { + final UInt64 currentEpoch = combinedChainDataClient.getCurrentEpoch(); + final UInt64 minEpochForBlobsSidecar = + denebForkEpoch.max(currentEpoch.minusMinZero(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)); + return requestEpoch.isGreaterThanOrEqualTo(minEpochForBlobsSidecar) + && requestEpoch.isLessThanOrEqualTo(currentEpoch); + } + + private SafeFuture sendBlobSidecars(final RequestState requestState) { + return requestState + .loadNextBlobSidecar() + .thenCompose( + maybeBlobSidecar -> + maybeBlobSidecar.map(requestState::sendBlobSidecar).orElse(SafeFuture.COMPLETE)) + .thenCompose( + __ -> { + requestState.incrementSlotAndIndex(); + if (requestState.isComplete()) { + return SafeFuture.completedFuture(requestState); + } else { + return sendBlobSidecars(requestState); + } + }); + } + + private void handleProcessingRequestError( + final Throwable error, final ResponseCallback callback) { + final Throwable rootCause = Throwables.getRootCause(error); + if (rootCause instanceof RpcException) { + LOG.trace("Rejecting blob sidecars by range request", error); + callback.completeWithErrorResponse((RpcException) rootCause); + } else { + if (rootCause instanceof StreamClosedException + || rootCause instanceof ClosedChannelException) { + LOG.trace("Stream closed while sending requested blobs sidecars", error); + } else { + LOG.error("Failed to process blob sidecars request", error); + } + callback.completeWithUnexpectedError(error); + } + } + + @VisibleForTesting + class RequestState { + + private final AtomicInteger sentBlobSidecars = new AtomicInteger(0); + private final ResponseCallback callback; + private final Bytes32 headBlockRoot; + private final AtomicReference currentSlot; + private final AtomicReference currentIndex; + private final UInt64 maxSlot; + + private Optional maybeCurrentBlock = Optional.empty(); + + RequestState( + final ResponseCallback callback, + final Bytes32 headBlockRoot, + final UInt64 currentSlot, + final UInt64 maxSlot) { + this.callback = callback; + this.headBlockRoot = headBlockRoot; + this.currentSlot = new AtomicReference<>(currentSlot); + this.currentIndex = new AtomicReference<>(UInt64.ZERO); + this.maxSlot = maxSlot; + } + + @VisibleForTesting + UInt64 getCurrentSlot() { + return currentSlot.get(); + } + + @VisibleForTesting + UInt64 getCurrentIndex() { + return currentIndex.get(); + } + + SafeFuture sendBlobSidecar(final BlobSidecar blobSidecar) { + return callback.respond(blobSidecar).thenRun(sentBlobSidecars::incrementAndGet); + } + + SafeFuture> loadNextBlobSidecar() { + // currentBlock is used to avoid querying the combinedChainDataClient for the same slot again + if (maybeCurrentBlock.isEmpty()) { + return combinedChainDataClient + .getBlockAtSlotExact(currentSlot.get(), headBlockRoot) + .thenCompose( + block -> { + maybeCurrentBlock = block; + return retrieveBlobSidecar(); + }); + } else { + return retrieveBlobSidecar(); + } + } + + boolean isComplete() { + return currentSlot.get().isGreaterThan(maxSlot) + || maxRequestSize.isLessThanOrEqualTo(sentBlobSidecars.get()); + } + + void incrementSlotAndIndex() { + if (currentIndex.get().equals(maxBlobsPerBlock.minus(UInt64.ONE))) { + currentIndex.set(UInt64.ZERO); + currentSlot.updateAndGet(UInt64::increment); + } else { + currentIndex.updateAndGet(UInt64::increment); + } + } + + @NotNull + private SafeFuture> retrieveBlobSidecar() { + SafeFuture> blobSidecar = + maybeCurrentBlock + .map(SignedBeaconBlock::getRoot) + .map( + blockRoot -> + combinedChainDataClient.getBlobSidecarByBlockRootAndIndex( + blockRoot, currentIndex.get())) + .orElse(SafeFuture.completedFuture(Optional.empty())); + + refreshCurrentBlock(); + + return blobSidecar; + } + + private void refreshCurrentBlock() { + maybeCurrentBlock.ifPresent( + block -> { + if (block.getSlot().equals(currentSlot.get()) + && currentIndex.get().equals(maxBlobsPerBlock.minus(UInt64.ONE))) { + maybeCurrentBlock = Optional.empty(); + } + }); + } + } +} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobsSidecarsByRangeMessageHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobsSidecarsByRangeMessageHandler.java index 397bf442387..4cd066585d2 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobsSidecarsByRangeMessageHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobsSidecarsByRangeMessageHandler.java @@ -119,6 +119,9 @@ public void onIncomingMessage( } final RequestState initialState = new RequestState(callback, headBlockRoot, startSlot, message.getMaxSlot()); + if (initialState.isComplete()) { + return SafeFuture.completedFuture(initialState); + } return sendBlobsSidecars(initialState); }) .finish( diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java index 5d438e86999..9a1514ef35f 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java @@ -82,7 +82,7 @@ public Eth2OutgoingRequestHandler( final TRequest request, Eth2RpcResponseHandler responseHandler) { this.timeoutRunner = timeoutRunner; - this.maximumResponseChunks = request.getMaximumRequestChunks(); + this.maximumResponseChunks = request.getMaximumResponseChunks(); this.responseHandler = responseHandler; responseStream = new ResponseStream<>(responseHandler); diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerTest.java index cc0e1ab6413..a7b08e12a50 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerTest.java @@ -38,7 +38,9 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.util.DataStructureUtil; @@ -55,6 +57,7 @@ class Eth2PeerTest { private final PeerChainValidator peerChainValidator = mock(PeerChainValidator.class); private final RateTracker blockRateTracker = mock(RateTracker.class); private final RateTracker blobsSidecarsRateTracker = mock(RateTracker.class); + private final RateTracker blobSidecarsRateTracker = mock(RateTracker.class); private final RateTracker rateTracker = mock(RateTracker.class); private final PeerStatus randomPeerStatus = randomPeerStatus(); @@ -69,6 +72,7 @@ class Eth2PeerTest { peerChainValidator, blockRateTracker, blobsSidecarsRateTracker, + blobSidecarsRateTracker, rateTracker); @Test @@ -189,6 +193,26 @@ public void shouldModifyRequestSpanningTheDenebForkTransition() { assertThat(request.getCount()).isEqualTo(UInt64.valueOf(6)); } + @Test + @SuppressWarnings({"unchecked", "FutureReturnValueIgnored"}) + public void shouldSetCountToZeroWhenSlotEqualsToDenebTransitionSlot() { + + final Eth2RpcMethod blobSidecarsByRangeMethod = + mock(Eth2RpcMethod.class); + + final RpcStreamController rpcStreamController = + mock(RpcStreamController.class); + + when(rpcMethods.blobSidecarsByRange()).thenReturn(Optional.of(blobSidecarsByRangeMethod)); + + when(peer.sendRequest(any(), any(), any())) + .thenReturn(SafeFuture.completedFuture(rpcStreamController)); + + peer.requestBlobsSidecarsByRange(UInt64.ONE, UInt64.valueOf(7), __ -> SafeFuture.COMPLETE); + + verify(delegate, never()).sendRequest(any(), any(), any()); + } + private PeerStatus randomPeerStatus() { return new PeerStatus( dataStructureUtil.randomBytes4(), diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java index a2193bfa873..518413287f2 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodsTest.java @@ -156,6 +156,17 @@ public void shouldCreateBlobsSidecarsByRangeWithDenebEnabled() { "/eth2/beacon_chain/req/blobs_sidecars_by_range/1/ssz_snappy")); } + @Test + public void shouldCreateBlobSidecarsByRangeWithDenebEnabled() { + final BeaconChainMethods methods = getMethods(TestSpecFactory.createMinimalDeneb()); + + assertThat(methods.blobSidecarsByRange()) + .hasValueSatisfying( + method -> + assertThat(method.getIds()) + .containsExactly("/eth2/beacon_chain/req/blob_sidecars_by_range/1/ssz_snappy")); + } + @Test public void shouldCreateBeaconBlockAndBlobsSidecarsByRootWithDenebEnabled() { final BeaconChainMethods methods = getMethods(TestSpecFactory.createMinimalDeneb()); diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRangeMessageHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRangeMessageHandlerTest.java new file mode 100644 index 00000000000..f4be395922b --- /dev/null +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRangeMessageHandlerTest.java @@ -0,0 +1,276 @@ +/* + * Copyright ConsenSys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE; +import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; +import static tech.pegasys.teku.spec.config.Constants.MAX_CHUNK_SIZE; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.tuweni.bytes.Bytes32; +import org.assertj.core.api.AssertionsForInterfaceTypes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; +import tech.pegasys.teku.networking.eth2.rpc.beaconchain.BeaconChainMethodIds; +import tech.pegasys.teku.networking.eth2.rpc.core.ResponseCallback; +import tech.pegasys.teku.networking.eth2.rpc.core.RpcException; +import tech.pegasys.teku.networking.eth2.rpc.core.encodings.RpcEncoding; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRangeRequestMessage; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.storage.client.CombinedChainDataClient; + +public class BlobSidecarsByRangeMessageHandlerTest { + + private static final RpcEncoding RPC_ENCODING = + RpcEncoding.createSszSnappyEncoding(MAX_CHUNK_SIZE); + + private final Spec spec = TestSpecFactory.createMinimalDeneb(); + + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + + private final UInt64 denebForkEpoch = UInt64.valueOf(1); + + private final int slotsPerEpoch = spec.getSlotsPerEpoch(ZERO); + + private final UInt64 startSlot = denebForkEpoch.increment().times(slotsPerEpoch); + + private final Bytes32 headBlockRoot = dataStructureUtil.randomBytes32(); + + private final UInt64 count = UInt64.valueOf(5); + + private final UInt64 maxRequestSize = UInt64.valueOf(10); + private final UInt64 maxBlobsPerBlock = UInt64.valueOf(2); + + private final StubMetricsSystem metricsSystem = new StubMetricsSystem(); + + private final Eth2Peer peer = mock(Eth2Peer.class); + + @SuppressWarnings("unchecked") + private final ResponseCallback listener = mock(ResponseCallback.class); + + private final CombinedChainDataClient combinedChainDataClient = + mock(CombinedChainDataClient.class); + + private final String protocolId = + BeaconChainMethodIds.getBlobSidecarsByRangeMethodId(1, RPC_ENCODING); + + private final BlobSidecarsByRangeMessageHandler handler = + new BlobSidecarsByRangeMessageHandler( + spec, + denebForkEpoch, + metricsSystem, + combinedChainDataClient, + maxRequestSize, + maxBlobsPerBlock); + + @BeforeEach + public void setUp() { + when(peer.wantToMakeRequest()).thenReturn(true); + when(peer.wantToReceiveBlobSidecars(listener, count.longValue())).thenReturn(true); + when(combinedChainDataClient.getEarliestAvailableBlobSidecarEpoch()) + .thenReturn(SafeFuture.completedFuture(Optional.of(ZERO))); + when(combinedChainDataClient.getCurrentEpoch()).thenReturn(denebForkEpoch.increment()); + when(combinedChainDataClient.getBestBlockRoot()).thenReturn(Optional.of(headBlockRoot)); + when(listener.respond(any())).thenReturn(SafeFuture.COMPLETE); + } + + @Test + public void validateRequest_validRequest() { + final Optional result = + handler.validateRequest(protocolId, new BlobSidecarsByRangeRequestMessage(startSlot, ONE)); + assertThat(result).isEmpty(); + } + + @Test + public void shouldNotSendBlobSidecarsIfPeerIsRateLimited() { + + when(peer.wantToReceiveBlobSidecars(listener, 5)).thenReturn(false); + + final BlobSidecarsByRangeRequestMessage request = + new BlobSidecarsByRangeRequestMessage(startSlot, count); + + handler.onIncomingMessage(protocolId, peer, request, listener); + + final long rateLimitedCount = + metricsSystem + .getCounter(TekuMetricCategory.NETWORK, "rpc_blob_sidecars_by_range_requests_total") + .getValue("rate_limited"); + + assertThat(rateLimitedCount).isOne(); + + verifyNoInteractions(listener); + } + + @Test + public void shouldSendResourceUnavailableIfBlobSidecarsAreNotAvailable() { + + // current epoch is 5020 + when(combinedChainDataClient.getCurrentEpoch()).thenReturn(UInt64.valueOf(5020)); + + // earliest available sidecar epoch - 5010 + when(combinedChainDataClient.getEarliestAvailableBlobSidecarEpoch()) + .thenReturn(SafeFuture.completedFuture(Optional.of(denebForkEpoch.plus(5009)))); + + // start slot in epoch 5000 within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS range + final BlobSidecarsByRangeRequestMessage request = + new BlobSidecarsByRangeRequestMessage(UInt64.valueOf(5000).times(slotsPerEpoch), count); + + handler.onIncomingMessage(protocolId, peer, request, listener); + + // blob sidecars should be available from epoch 5000, but they are + // available from epoch 5010 + verify(listener) + .completeWithErrorResponse( + new RpcException.ResourceUnavailableException( + "Requested blob sidecars are not available.")); + } + + @Test + public void shouldCompleteSuccessfullyIfRequestNotWithinRange() { + when(combinedChainDataClient.getBlockAtSlotExact(any(), eq(headBlockRoot))) + .thenReturn( + SafeFuture.completedFuture(Optional.of(dataStructureUtil.randomSignedBeaconBlock()))); + + final BlobSidecarsByRangeRequestMessage request = + new BlobSidecarsByRangeRequestMessage(ZERO, count); + + handler.onIncomingMessage(protocolId, peer, request, listener); + + verify(combinedChainDataClient, times(count.times(maxBlobsPerBlock).intValue())) + .getBlobSidecarByBlockRootAndIndex(any(), any()); + + verify(listener, never()).respond(any()); + + verify(listener).completeSuccessfully(); + } + + @Test + public void shouldSendToPeerRequestedNumberOfBlobSidecars() { + + final BlobSidecarsByRangeRequestMessage request = + new BlobSidecarsByRangeRequestMessage(startSlot, count); + + final List expectedSent = + setUpBlobSidecarsData(startSlot, request.getMaxSlot(), headBlockRoot); + + handler.onIncomingMessage(protocolId, peer, request, listener); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(BlobSidecar.class); + + verify(listener, times(count.times(maxBlobsPerBlock).intValue())) + .respond(argumentCaptor.capture()); + + verify(combinedChainDataClient, times(count.intValue())).getBlockAtSlotExact(any(), any()); + + final List actualSent = argumentCaptor.getAllValues(); + + verify(listener).completeSuccessfully(); + + AssertionsForInterfaceTypes.assertThat(actualSent).containsExactlyElementsOf(expectedSent); + } + + @Test + public void shouldIgnoreRequestWhenCountIsZero() { + + final BlobSidecarsByRangeRequestMessage request = + new BlobSidecarsByRangeRequestMessage(startSlot, ZERO); + + when(peer.wantToReceiveBlobSidecars(listener, 0)).thenReturn(true); + + handler.onIncomingMessage(protocolId, peer, request, listener); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(BlobSidecar.class); + + verify(listener, never()).respond(argumentCaptor.capture()); + + final List actualSent = argumentCaptor.getAllValues(); + + verify(listener).completeSuccessfully(); + + AssertionsForInterfaceTypes.assertThat(actualSent).isEmpty(); + } + + @Test + public void shouldIterateThroughIndicesAndSlots() { + + UInt64 count = UInt64.valueOf(5); + UInt64 currentSlot = ZERO; + BlobSidecarsByRangeMessageHandler.RequestState request = + handler.new RequestState(listener, headBlockRoot, currentSlot, count); + + for (int slot = currentSlot.intValue(); slot <= count.intValue(); slot++) { + for (int index = 0; index < maxBlobsPerBlock.intValue(); index++) { + assertThat(request.isComplete()).isFalse(); + assertThat(request.getCurrentSlot()).isEqualTo(UInt64.valueOf(slot)); + assertThat(request.getCurrentIndex()).isEqualTo(UInt64.valueOf(index)); + request.incrementSlotAndIndex(); + } + } + + assertThat(request.isComplete()).isTrue(); + } + + private List setUpBlobSidecarsData( + final UInt64 startSlot, final UInt64 maxSlot, final Bytes32 headBlockRoot) { + List blobSidecars = new ArrayList<>(); + for (int slot = startSlot.intValue(); slot <= maxSlot.intValue(); slot++) { + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(slot); + when(combinedChainDataClient.getBlockAtSlotExact(UInt64.valueOf(slot), headBlockRoot)) + .thenReturn(SafeFuture.completedFuture(Optional.of(block))); + final BlobSidecar blobSidecar0 = + dataStructureUtil.randomBlobSidecar( + block.getRoot(), + ZERO, + UInt64.valueOf(slot), + block.getParentRoot(), + dataStructureUtil.randomValidatorIndex()); + when(combinedChainDataClient.getBlobSidecarByBlockRootAndIndex(block.getRoot(), ZERO)) + .thenReturn(SafeFuture.completedFuture(Optional.of(blobSidecar0))); + blobSidecars.add(blobSidecar0); + final BlobSidecar blobSidecar1 = + dataStructureUtil.randomBlobSidecar( + block.getRoot(), + ONE, + UInt64.valueOf(slot), + block.getParentRoot(), + dataStructureUtil.randomValidatorIndex()); + when(combinedChainDataClient.getBlobSidecarByBlockRootAndIndex(block.getRoot(), ONE)) + .thenReturn(SafeFuture.completedFuture(Optional.of(blobSidecar1))); + blobSidecars.add(blobSidecar1); + } + + return blobSidecars; + } +} diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java index 77fc97d6663..b4400c69bb6 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java @@ -230,6 +230,21 @@ public SafeFuture requestBlobsSidecarsByRange( return createPendingBlobsSidecarRequest(handler); } + @Override + public SafeFuture requestBlobSidecarsByRange( + final UInt64 startSlot, final UInt64 count, final RpcResponseListener listener) { + final long lastSlotExclusive = startSlot.longValue() + count.longValue(); + + final PendingRequestHandler handler = + PendingRequestHandler.createForBatchBlobSidecarRequest( + listener, + () -> + chain + .streamBlobSidecars(startSlot.longValue(), lastSlotExclusive + 1) + .collect(Collectors.toList())); + return createPendingBlobSidecarRequest(handler); + } + @Override public SafeFuture requestBlocksByRoot( final List blockRoots, final RpcResponseListener listener) { @@ -330,6 +345,13 @@ private SafeFuture createPendingBlobsSidecarRequest( return request.getFuture(); } + private SafeFuture createPendingBlobSidecarRequest( + final PendingRequestHandler handler) { + final PendingRequest request = new PendingRequest<>(handler); + pendingRequests.add(request); + return request.getFuture(); + } + @Override public SafeFuture requestMetadata() { final MetadataMessage defaultMetadata = @@ -594,6 +616,12 @@ static PendingRequestHandler createForBatchBlobsSidecarReque return createForBatchRequest(listener, blobsSidecarsSupplier); } + static PendingRequestHandler createForBatchBlobSidecarRequest( + final RpcResponseListener listener, + final Supplier> blobSidecarsSupplier) { + return createForBatchRequest(listener, blobSidecarsSupplier); + } + static PendingRequestHandler createForBatchBlockAndBlobsSidecarRequest( final RpcResponseListener listener, diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java index 8d0a1860de9..7c4eb830741 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java @@ -27,19 +27,23 @@ import tech.pegasys.teku.networking.p2p.reputation.ReputationAdjustment; import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; public class StubSyncSource implements SyncSource { private final List blocksRequests = new ArrayList<>(); private final List blobsSidecarsRequests = new ArrayList<>(); + private final List blobSidecarsRequests = new ArrayList<>(); private Optional> currentBlockRequest = Optional.empty(); private Optional> currentBlockListener = Optional.empty(); private Optional> currentBlobsSidecarRequest = Optional.empty(); + private Optional> currentBlobSidecarRequest = Optional.empty(); private Optional> currentBlobsSidecarListener = Optional.empty(); + private Optional> currentBlobSidecarListener = Optional.empty(); public void receiveBlocks(final SignedBeaconBlock... blocks) { final RpcResponseListener listener = currentBlockListener.orElseThrow(); @@ -54,6 +58,13 @@ public void receiveBlobsSidecars(final BlobsSidecar... blobsSidecars) { currentBlobsSidecarRequest.orElseThrow().complete(null); } + public void receiveBlobSidecars(final BlobSidecar... blobSidecars) { + final RpcResponseListener listener = currentBlobSidecarListener.orElseThrow(); + Stream.of(blobSidecars) + .forEach(response -> assertThat(listener.onResponse(response)).isCompleted()); + currentBlobSidecarRequest.orElseThrow().complete(null); + } + public void failRequest(final Throwable error) { currentBlockRequest.orElseThrow().completeExceptionally(error); } @@ -84,6 +95,17 @@ public SafeFuture requestBlobsSidecarsByRange( return request; } + @Override + public SafeFuture requestBlobSidecarsByRange( + final UInt64 startSlot, final UInt64 count, final RpcResponseListener listener) { + checkArgument(count.isGreaterThan(UInt64.ZERO), "Count must be greater than zero"); + blobSidecarsRequests.add(new Request(startSlot, count)); + final SafeFuture request = new SafeFuture<>(); + currentBlobSidecarRequest = Optional.of(request); + currentBlobSidecarListener = Optional.of(listener); + return request; + } + @Override public SafeFuture disconnectCleanly(final DisconnectReason reason) { return SafeFuture.COMPLETE; diff --git a/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java b/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java index f36d4286441..f5bed7c3bfa 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java @@ -479,14 +479,19 @@ public SafeFuture> getEarliestAvailableBlobsSidecarEpoch() { .thenApply(slot -> slot.map(spec::computeEpochAtSlot)); } + public SafeFuture> getEarliestAvailableBlobSidecarEpoch() { + return SafeFuture.failedFuture(new UnsupportedOperationException("Not yet implemented")); + } + public SafeFuture> getBlobsSidecarBySlotAndBlockRoot( final UInt64 slot, final Bytes32 blockRoot) { return historicalChainData.getBlobsSidecar(new SlotAndBlockRoot(slot, blockRoot)); } + @SuppressWarnings("unused") public SafeFuture> getBlobSidecarByBlockRootAndIndex( final Bytes32 blockRoot, final UInt64 index) { - throw new UnsupportedOperationException("Not yet implemented"); + return SafeFuture.failedFuture(new UnsupportedOperationException("Not yet implemented")); } private boolean isRecentData(final UInt64 slot) {