diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobIdentifier.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobIdentifier.java new file mode 100644 index 00000000000..7e2c9f0eb4f --- /dev/null +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobIdentifier.java @@ -0,0 +1,57 @@ +/* + * Copyright ConsenSys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc; + +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.ssz.containers.Container2; +import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2; +import tech.pegasys.teku.infrastructure.ssz.primitive.SszBytes32; +import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64; +import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas; +import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; + +public class BlobIdentifier extends Container2 { + + public static class BlobIdentifierSchema + extends ContainerSchema2 { + + private BlobIdentifierSchema() { + super(SszPrimitiveSchemas.BYTES32_SCHEMA, SszPrimitiveSchemas.UINT64_SCHEMA); + } + + @Override + public BlobIdentifier createFromBackingNode(final TreeNode node) { + return new BlobIdentifier(node); + } + } + + public static final BlobIdentifierSchema SSZ_SCHEMA = new BlobIdentifierSchema(); + + private BlobIdentifier(final TreeNode node) { + super(SSZ_SCHEMA, node); + } + + public BlobIdentifier(final Bytes32 root, final UInt64 index) { + super(SSZ_SCHEMA, SszBytes32.of(root), SszUInt64.of(index)); + } + + public Bytes32 getBlockRoot() { + return getField0().get(); + } + + public UInt64 getIndex() { + return getField1().get(); + } +} diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRootRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRootRequestMessage.java new file mode 100644 index 00000000000..29485ac182f --- /dev/null +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/BlobSidecarsByRootRequestMessage.java @@ -0,0 +1,58 @@ +/* + * Copyright ConsenSys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc; + +import java.util.List; +import tech.pegasys.teku.infrastructure.ssz.SszList; +import tech.pegasys.teku.infrastructure.ssz.impl.SszListImpl; +import tech.pegasys.teku.infrastructure.ssz.schema.impl.AbstractSszListSchema; +import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; + +public class BlobSidecarsByRootRequestMessage extends SszListImpl + implements SszList, RpcRequest { + + public static class BlobSidecarsByRootRequestMessageSchema + extends AbstractSszListSchema { + + public BlobSidecarsByRootRequestMessageSchema(final UInt64 maxLength) { + super(BlobIdentifier.SSZ_SCHEMA, maxLength.longValue()); + } + + @Override + public BlobSidecarsByRootRequestMessage createFromBackingNode(final TreeNode node) { + return new BlobSidecarsByRootRequestMessage(this, node); + } + } + + public BlobSidecarsByRootRequestMessage( + final BlobSidecarsByRootRequestMessageSchema schema, final List identifiers) { + super(schema, schema.createTreeFromElements(identifiers)); + } + + private BlobSidecarsByRootRequestMessage( + final BlobSidecarsByRootRequestMessageSchema schema, final TreeNode node) { + super(schema, node); + } + + @Override + public int getMaximumRequestChunks() { + return size(); + } + + @Override + public String toString() { + return "BlobSidecarsByRootRequestMessage{" + super.toString() + "}"; + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/SafeFuture.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/SafeFuture.java index 6b6677eb342..216aaefa690 100644 --- a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/SafeFuture.java +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/SafeFuture.java @@ -516,6 +516,18 @@ public SafeFuture thenAcceptAsync( return (SafeFuture) super.thenAcceptAsync(action, executor); } + public SafeFuture thenAcceptChecked(final ExceptionThrowingConsumer action) { + return thenCompose( + value -> { + try { + action.accept(value); + return SafeFuture.COMPLETE; + } catch (final Throwable e) { + return SafeFuture.failedFuture(e); + } + }); + } + @SuppressWarnings("unchecked") @Override public SafeFuture thenCombine( diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java index 3f0fa50bf58..80293e167c6 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java @@ -57,6 +57,9 @@ public static BlobSidecarGossipManager create( final SpecVersion forkSpecVersion = spec.atEpoch(forkInfo.getFork().getEpoch()); final int maxBlobsPerBlock = SpecConfigDeneb.required(forkSpecVersion.getConfig()).getMaxBlobsPerBlock(); + final SignedBlobSidecarSchema gossipType = + SchemaDefinitionsDeneb.required(forkSpecVersion.getSchemaDefinitions()) + .getSignedBlobSidecarSchema(); final Int2ObjectMap> indexToTopicHandler = new Int2ObjectOpenHashMap<>(); IntStream.range(0, maxBlobsPerBlock) @@ -71,8 +74,7 @@ public static BlobSidecarGossipManager create( processor, gossipEncoding, forkInfo, - SchemaDefinitionsDeneb.required(forkSpecVersion.getSchemaDefinitions()) - .getSignedBlobSidecarSchema(), + gossipType, maxMessageSize); indexToTopicHandler.put(index, topicHandler); }); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java index ae300e2285d..76ebdc6f99d 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java @@ -18,12 +18,12 @@ import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOCKS; import com.google.common.base.MoreObjects; -import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes32; @@ -49,6 +49,7 @@ import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlockAndBlobsSidecarByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage; @@ -243,6 +244,13 @@ public SafeFuture requestBlockAndBlobsSidecarByRoot( "BlockAndBlobsSidecarByRoot method is not available"))); } + @Override + public SafeFuture requestBlobSidecarsByRoot( + final List blockRoots, final RpcResponseListener listener) + throws RpcException { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public SafeFuture> requestBlockBySlot(final UInt64 slot) { final Eth2RpcMethod blocksByRange = diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java index 29d07ceaa7a..fdf77e44f8c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2Peer.java @@ -31,6 +31,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage; @@ -91,6 +92,10 @@ SafeFuture requestBlockAndBlobsSidecarByRoot( List blockRoots, RpcResponseListener listener) throws RpcException; + SafeFuture requestBlobSidecarsByRoot( + List blockRoots, RpcResponseListener listener) + throws RpcException; + SafeFuture> requestBlockBySlot(UInt64 slot); SafeFuture> requestBlockByRoot(Bytes32 blockRoot); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java index 4f35d761c1b..b199b50b532 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java @@ -20,6 +20,7 @@ public class BeaconChainMethodIds { static final String STATUS = "/eth2/beacon_chain/req/status"; static final String GOODBYE = "/eth2/beacon_chain/req/goodbye"; static final String BEACON_BLOCKS_BY_ROOT = "/eth2/beacon_chain/req/beacon_blocks_by_root"; + static final String BLOB_SIDECARS_BY_ROOT = "/eth2/beacon_chain/req/blob_sidecars_by_root"; static final String BEACON_BLOCKS_BY_RANGE = "/eth2/beacon_chain/req/beacon_blocks_by_range"; static final String BEACON_BLOCK_AND_BLOBS_SIDECAR_BY_ROOT = "/eth2/beacon_chain/req/beacon_block_and_blobs_sidecar_by_root"; @@ -45,6 +46,11 @@ public static String getBeaconBlockAndBlobsSidecarByRoot( return getMethodId(BEACON_BLOCK_AND_BLOBS_SIDECAR_BY_ROOT, version, encoding); } + public static String getBlobSidecarsByRootMethodId( + final int version, final RpcEncoding encoding) { + return getMethodId(BLOB_SIDECARS_BY_ROOT, version, encoding); + } + public static String getBlobsSidecarsByRangeMethodId( final int version, final RpcEncoding encoding) { return getMethodId(BLOBS_SIDECARS_BY_RANGE, version, encoding); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java index fb65b095527..14e5a2c96cb 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java @@ -15,6 +15,7 @@ import static tech.pegasys.teku.spec.config.Constants.MAX_BLOCK_BY_RANGE_REQUEST_SIZE; import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOBS_SIDECARS; +import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOB_SIDECARS; import java.util.ArrayList; import java.util.Collection; @@ -30,6 +31,7 @@ import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlockAndBlobsSidecarByRootMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlocksByRangeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlocksByRootMessageHandler; +import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobSidecarsByRootMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobsSidecarsByRangeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.GoodbyeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.MetadataMessageHandler; @@ -46,8 +48,10 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcMethod; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlockAndBlobsSidecarByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlockAndBlobsSidecarByRootRequestMessage.BeaconBlockAndBlobsSidecarByRootRequestMessageSchema; @@ -55,6 +59,8 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage.BeaconBlocksByRangeRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage.BeaconBlocksByRootRequestMessageSchema; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage.BlobSidecarsByRootRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage.BlobsSidecarsByRangeRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage; @@ -80,6 +86,8 @@ public class BeaconChainMethods { beaconBlockAndBlobsSidecarByRoot; private final Optional> blobsSidecarsByRange; + private final Optional> + blobSidecarsByRoot; private final Eth2RpcMethod getMetadata; private final Eth2RpcMethod ping; @@ -96,6 +104,8 @@ private BeaconChainMethods( beaconBlockAndBlobsSidecarByRoot, final Optional> blobsSidecarsByRange, + final Optional> + blobSidecarsByRoot, final Eth2RpcMethod getMetadata, final Eth2RpcMethod ping) { this.status = status; @@ -104,11 +114,13 @@ private BeaconChainMethods( this.beaconBlocksByRange = beaconBlocksByRange; this.beaconBlockAndBlobsSidecarByRoot = beaconBlockAndBlobsSidecarByRoot; this.blobsSidecarsByRange = blobsSidecarsByRange; + this.blobSidecarsByRoot = blobSidecarsByRoot; this.getMetadata = getMetadata; this.ping = ping; this.allMethods = new ArrayList<>( List.of(status, goodBye, beaconBlocksByRoot, beaconBlocksByRange, getMetadata, ping)); + blobSidecarsByRoot.ifPresent(allMethods::add); beaconBlockAndBlobsSidecarByRoot().ifPresent(allMethods::add); blobsSidecarsByRange.ifPresent(allMethods::add); } @@ -146,6 +158,8 @@ public static BeaconChainMethods create( peerLookup, rpcEncoding, recentChainData), + createBlobSidecarsByRoot( + spec, asyncRunner, combinedChainDataClient, peerLookup, rpcEncoding, recentChainData), createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding), createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding)); } @@ -302,6 +316,48 @@ private static Eth2RpcMethod createGoodBye( peerLookup)); } + private static Optional> + createBlobSidecarsByRoot( + final Spec spec, + final AsyncRunner asyncRunner, + final CombinedChainDataClient combinedChainDataClient, + final PeerLookup peerLookup, + final RpcEncoding rpcEncoding, + final RecentChainData recentChainData) { + if (!spec.isMilestoneSupported(SpecMilestone.DENEB)) { + return Optional.empty(); + } + + final RpcContextCodec forkDigestContextCodec = + RpcContextCodec.forkDigest( + spec, recentChainData, ForkDigestPayloadContext.SIGNED_BLOB_SIDECAR); + + final int maxBlobsPerBlock = + SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig()) + .getMaxBlobsPerBlock(); + + final UInt64 maxRequestSize = MAX_REQUEST_BLOB_SIDECARS.times(maxBlobsPerBlock); + + final BlobSidecarsByRootRequestMessageSchema requestType = + new BlobSidecarsByRootRequestMessageSchema(maxRequestSize); + + final BlobSidecarsByRootMessageHandler blobSidecarsByRootHandler = + new BlobSidecarsByRootMessageHandler( + spec, getDenebForkEpoch(spec), combinedChainDataClient, maxRequestSize); + + return Optional.of( + new SingleProtocolEth2RpcMethod<>( + asyncRunner, + BeaconChainMethodIds.BLOB_SIDECARS_BY_ROOT, + 1, + rpcEncoding, + requestType, + true, + forkDigestContextCodec, + blobSidecarsByRootHandler, + peerLookup)); + } + private static Optional> createBlobsSidecarsByRange( final Spec spec, @@ -462,6 +518,11 @@ public Eth2RpcMethod beaco return blobsSidecarsByRange; } + public Optional> + blobSidecarsByRoot() { + return blobSidecarsByRoot; + } + public Eth2RpcMethod getMetadata() { return getMetadata; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRootMessageHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRootMessageHandler.java new file mode 100644 index 00000000000..034dd58a68d --- /dev/null +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRootMessageHandler.java @@ -0,0 +1,161 @@ +/* + * Copyright ConsenSys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods; + +import static tech.pegasys.teku.spec.config.Constants.MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; + +import com.google.common.base.Throwables; +import java.nio.channels.ClosedChannelException; +import java.util.Optional; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; +import tech.pegasys.teku.networking.eth2.rpc.core.PeerRequiredLocalMessageHandler; +import tech.pegasys.teku.networking.eth2.rpc.core.ResponseCallback; +import tech.pegasys.teku.networking.eth2.rpc.core.RpcException; +import tech.pegasys.teku.networking.eth2.rpc.core.RpcException.ResourceUnavailableException; +import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage; +import tech.pegasys.teku.spec.datastructures.state.AnchorPoint; +import tech.pegasys.teku.storage.client.CombinedChainDataClient; + +/** + * BlobSidecarsByRoot + * v1 + */ +public class BlobSidecarsByRootMessageHandler + extends PeerRequiredLocalMessageHandler { + + private static final Logger LOG = LogManager.getLogger(); + + private final Spec spec; + private final UInt64 denebForkEpoch; + private final CombinedChainDataClient combinedChainDataClient; + private final UInt64 maxRequestSize; + + public BlobSidecarsByRootMessageHandler( + final Spec spec, + final UInt64 denebForkEpoch, + final CombinedChainDataClient combinedChainDataClient, + final UInt64 maxRequestSize) { + this.spec = spec; + this.denebForkEpoch = denebForkEpoch; + this.combinedChainDataClient = combinedChainDataClient; + this.maxRequestSize = maxRequestSize; + } + + @Override + public void onIncomingMessage( + final String protocolId, + final Eth2Peer peer, + final BlobSidecarsByRootRequestMessage message, + final ResponseCallback callback) { + + LOG.trace("Peer {} requested BlobSidecars with blob identifiers: {}", peer.getId(), message); + + // TODO: implement rate limiting + + SafeFuture future = SafeFuture.COMPLETE; + + final UInt64 finalizedEpoch = + combinedChainDataClient.getLatestFinalized().map(AnchorPoint::getEpoch).orElse(UInt64.ZERO); + + for (final BlobIdentifier identifier : message) { + final Bytes32 blockRoot = identifier.getBlockRoot(); + future = + future + .thenCompose(__ -> validateRequestedBlockRoot(blockRoot, finalizedEpoch)) + .thenCompose(__ -> retrieveBlobSidecar(identifier)) + .thenComposeChecked( + maybeSidecar -> { + if (maybeSidecar.isEmpty()) { + throw new ResourceUnavailableException( + String.format( + "Blob sidecar for block root (%s) was not available", blockRoot)); + } + return callback.respond(maybeSidecar.get()); + }); + } + + future.finish(callback::completeSuccessfully, err -> handleError(callback, err)); + } + + /** + * Validations: + * + *
    + *
  • A block for the block root is available. + *
  • The block root references a block greater than or equal to the minimum_request_epoch + *
+ */ + private SafeFuture validateRequestedBlockRoot( + final Bytes32 blockRoot, final UInt64 finalizedEpoch) { + return combinedChainDataClient + .getBlockByBlockRoot(blockRoot) + .thenAcceptChecked( + maybeBlock -> { + if (maybeBlock.isEmpty()) { + throw new ResourceUnavailableException( + String.format("Block for block root (%s) couldn't be retrieved", blockRoot)); + } + final SignedBeaconBlock block = maybeBlock.get(); + final UInt64 requestedEpoch = spec.computeEpochAtSlot(block.getSlot()); + final UInt64 minimumRequestEpoch = computeMinimumRequestEpoch(finalizedEpoch); + if (requestedEpoch.isLessThan(minimumRequestEpoch)) { + throw new ResourceUnavailableException( + String.format( + "Block root (%s) references a block earlier than the minimum_request_epoch (%s)", + blockRoot, minimumRequestEpoch)); + } + }); + } + + private UInt64 computeMinimumRequestEpoch(final UInt64 finalizedEpoch) { + final UInt64 currentEpoch = combinedChainDataClient.getCurrentEpoch(); + return finalizedEpoch + .max(currentEpoch.minusMinZero(MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS)) + .max(denebForkEpoch); + } + + private SafeFuture> retrieveBlobSidecar( + final BlobIdentifier identifier) { + return combinedChainDataClient.getBlobSidecarByBlockRootAndIndex( + identifier.getBlockRoot(), identifier.getIndex()); + } + + private void handleError( + final ResponseCallback callback, final Throwable error) { + final Throwable rootCause = Throwables.getRootCause(error); + if (rootCause instanceof RpcException) { + LOG.trace("Rejecting blob sidecars by root request", error); + callback.completeWithErrorResponse((RpcException) rootCause); + } else { + if (rootCause instanceof StreamClosedException + || rootCause instanceof ClosedChannelException) { + LOG.trace("Stream closed while sending requested blob sidecars", error); + } else { + LOG.error("Failed to process blob sidecars by root request", error); + } + callback.completeWithUnexpectedError(error); + } + } +} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/encodings/context/ForkDigestPayloadContext.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/encodings/context/ForkDigestPayloadContext.java index 3f5762fa692..1a8330f85e6 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/encodings/context/ForkDigestPayloadContext.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/encodings/context/ForkDigestPayloadContext.java @@ -18,6 +18,7 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; import tech.pegasys.teku.spec.schemas.SchemaDefinitions; @@ -51,6 +52,20 @@ public SszSchema getSchemaFromSchemaDefinitions( } }; + ForkDigestPayloadContext SIGNED_BLOB_SIDECAR = + new ForkDigestPayloadContext<>() { + @Override + public UInt64 getSlotFromPayload(final SignedBlobSidecar responsePayload) { + return responsePayload.getBlobSidecar().getSlot(); + } + + @Override + public SszSchema getSchemaFromSchemaDefinitions( + final SchemaDefinitions schemaDefinitions) { + return schemaDefinitions.toVersionDeneb().orElseThrow().getSignedBlobSidecarSchema(); + } + }; + ForkDigestPayloadContext SIGNED_BEACON_BLOCK_AND_BLOBS_SIDECAR = new ForkDigestPayloadContext<>() { @Override diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRootMessageHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRootMessageHandlerTest.java new file mode 100644 index 00000000000..aef5cc3bf27 --- /dev/null +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/BlobSidecarsByRootMessageHandlerTest.java @@ -0,0 +1,73 @@ +/* + * Copyright ConsenSys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods; + +import static org.mockito.Mockito.mock; +import static tech.pegasys.teku.spec.config.Constants.MAX_CHUNK_SIZE_BELLATRIX; + +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.networking.eth2.rpc.beaconchain.BeaconChainMethodIds; +import tech.pegasys.teku.networking.eth2.rpc.core.ResponseCallback; +import tech.pegasys.teku.networking.eth2.rpc.core.encodings.RpcEncoding; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar; +import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.storage.client.CombinedChainDataClient; + +// TODO: implement tests +public class BlobSidecarsByRootMessageHandlerTest { + + private static final RpcEncoding RPC_ENCODING = + RpcEncoding.createSszSnappyEncoding(MAX_CHUNK_SIZE_BELLATRIX); + + private final String protocolId = + BeaconChainMethodIds.getBlobSidecarsByRootMethodId(1, RPC_ENCODING); + + private final UInt64 denebForkEpoch = UInt64.valueOf(1); + + private final Spec spec = TestSpecFactory.createMinimalWithDenebForkEpoch(denebForkEpoch); + + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + + @SuppressWarnings("unchecked") + private final ResponseCallback listener = mock(ResponseCallback.class); + + private final CombinedChainDataClient combinedChainDataClient = + mock(CombinedChainDataClient.class); + + private final UInt64 maxRequestSize = UInt64.valueOf(8); + + BlobSidecarsByRootMessageHandler handler = + new BlobSidecarsByRootMessageHandler( + spec, denebForkEpoch, combinedChainDataClient, maxRequestSize); + + @SuppressWarnings("unchecked") + final ResponseCallback callback = mock(ResponseCallback.class); + + @Test + public void shouldSendResourceUnavailableIfBlockForBlockRootIsNotAvailable() {} + + @Test + public void + shouldSendResourceUnavailableIfBlockRootReferencesBlockEarlierThanTheMinimumRequestEpoch() {} + + @Test + public void shouldSendResourceUnavailableIfBlobSidecarIsNotAvailable() {} + + @Test + public void shouldSendToPeerRequestedBlobSidecars() {} +} diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java index 4e820cc045f..32422db9387 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java @@ -32,6 +32,7 @@ import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.rpc.core.ResponseCallback; +import tech.pegasys.teku.networking.eth2.rpc.core.RpcException; import tech.pegasys.teku.networking.eth2.rpc.core.methods.Eth2RpcMethod; import tech.pegasys.teku.networking.p2p.mock.MockNodeIdGenerator; import tech.pegasys.teku.networking.p2p.network.PeerAddress; @@ -51,6 +52,7 @@ import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage; @@ -259,6 +261,13 @@ public SafeFuture requestBlockAndBlobsSidecarByRoot( return createPendingBlockAndBlobsSidecarRequest(handler); } + @Override + public SafeFuture requestBlobSidecarsByRoot( + final List blockRoots, final RpcResponseListener listener) + throws RpcException { + throw new UnsupportedOperationException("Not yet implemented"); + } + @Override public SafeFuture> requestBlockBySlot(final UInt64 slot) { final PendingRequestHandler, SignedBeaconBlock> handler = diff --git a/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java b/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java index 12af7a9759d..953f508fa23 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/client/CombinedChainDataClient.java @@ -37,6 +37,7 @@ import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar; import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar; import tech.pegasys.teku.spec.datastructures.forkchoice.ReadOnlyForkChoiceStrategy; import tech.pegasys.teku.spec.datastructures.forkchoice.ReadOnlyStore; @@ -483,6 +484,11 @@ public SafeFuture> getBlobsSidecarBySlotAndBlockRoot( return historicalChainData.getBlobsSidecar(new SlotAndBlockRoot(slot, blockRoot)); } + public SafeFuture> getBlobSidecarByBlockRootAndIndex( + final Bytes32 blockRoot, final UInt64 index) { + throw new UnsupportedOperationException("Not yet implemented"); + } + private boolean isRecentData(final UInt64 slot) { checkNotNull(slot); if (recentChainData.isPreGenesis()) {