Skip to content

Commit

Permalink
Implement BlobSidecarsByRoot RPC method
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Feb 21, 2023
1 parent 596a9ac commit c4d09c7
Show file tree
Hide file tree
Showing 13 changed files with 476 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright ConsenSys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc;

import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.ssz.containers.Container2;
import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2;
import tech.pegasys.teku.infrastructure.ssz.primitive.SszBytes32;
import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64;
import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas;
import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class BlobIdentifier extends Container2<BlobIdentifier, SszBytes32, SszUInt64> {

public static class BlobIdentifierSchema
extends ContainerSchema2<BlobIdentifier, SszBytes32, SszUInt64> {

private BlobIdentifierSchema() {
super(SszPrimitiveSchemas.BYTES32_SCHEMA, SszPrimitiveSchemas.UINT64_SCHEMA);
}

@Override
public BlobIdentifier createFromBackingNode(final TreeNode node) {
return new BlobIdentifier(node);
}
}

public static final BlobIdentifierSchema SSZ_SCHEMA = new BlobIdentifierSchema();

private BlobIdentifier(final TreeNode node) {
super(SSZ_SCHEMA, node);
}

public BlobIdentifier(final Bytes32 root, final UInt64 index) {
super(SSZ_SCHEMA, SszBytes32.of(root), SszUInt64.of(index));
}

public Bytes32 getBlockRoot() {
return getField0().get();
}

public UInt64 getIndex() {
return getField1().get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc;

import java.util.List;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.ssz.impl.SszListImpl;
import tech.pegasys.teku.infrastructure.ssz.schema.impl.AbstractSszListSchema;
import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class BlobSidecarsByRootRequestMessage extends SszListImpl<BlobIdentifier>
implements SszList<BlobIdentifier>, RpcRequest {

public static class BlobSidecarsByRootRequestMessageSchema
extends AbstractSszListSchema<BlobIdentifier, BlobSidecarsByRootRequestMessage> {

public BlobSidecarsByRootRequestMessageSchema(final UInt64 maxLength) {
super(BlobIdentifier.SSZ_SCHEMA, maxLength.longValue());
}

@Override
public BlobSidecarsByRootRequestMessage createFromBackingNode(final TreeNode node) {
return new BlobSidecarsByRootRequestMessage(this, node);
}
}

public BlobSidecarsByRootRequestMessage(
final BlobSidecarsByRootRequestMessageSchema schema, final List<BlobIdentifier> identifiers) {
super(schema, schema.createTreeFromElements(identifiers));
}

private BlobSidecarsByRootRequestMessage(
final BlobSidecarsByRootRequestMessageSchema schema, final TreeNode node) {
super(schema, node);
}

@Override
public int getMaximumRequestChunks() {
return size();
}

@Override
public String toString() {
return "BlobSidecarsByRootRequestMessage{" + super.toString() + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,18 @@ public SafeFuture<Void> thenAcceptAsync(
return (SafeFuture<Void>) super.thenAcceptAsync(action, executor);
}

public SafeFuture<Void> thenAcceptChecked(final ExceptionThrowingConsumer<? super T> action) {
return thenCompose(
value -> {
try {
action.accept(value);
return SafeFuture.COMPLETE;
} catch (final Throwable e) {
return SafeFuture.failedFuture(e);
}
});
}

@SuppressWarnings("unchecked")
@Override
public <U, V> SafeFuture<V> thenCombine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public static BlobSidecarGossipManager create(
final SpecVersion forkSpecVersion = spec.atEpoch(forkInfo.getFork().getEpoch());
final int maxBlobsPerBlock =
SpecConfigDeneb.required(forkSpecVersion.getConfig()).getMaxBlobsPerBlock();
final SignedBlobSidecarSchema gossipType =
SchemaDefinitionsDeneb.required(forkSpecVersion.getSchemaDefinitions())
.getSignedBlobSidecarSchema();
final Int2ObjectMap<Eth2TopicHandler<SignedBlobSidecar>> indexToTopicHandler =
new Int2ObjectOpenHashMap<>();
IntStream.range(0, maxBlobsPerBlock)
Expand All @@ -71,8 +74,7 @@ public static BlobSidecarGossipManager create(
processor,
gossipEncoding,
forkInfo,
SchemaDefinitionsDeneb.required(forkSpecVersion.getSchemaDefinitions())
.getSignedBlobSidecarSchema(),
gossipType,
maxMessageSize);
indexToTopicHandler.put(index, topicHandler);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOCKS;

import com.google.common.base.MoreObjects;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
Expand All @@ -49,6 +49,7 @@
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlockAndBlobsSidecarByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage;
Expand Down Expand Up @@ -243,6 +244,13 @@ public SafeFuture<Void> requestBlockAndBlobsSidecarByRoot(
"BlockAndBlobsSidecarByRoot method is not available")));
}

@Override
public SafeFuture<Void> requestBlobSidecarsByRoot(
final List<Bytes32> blockRoots, final RpcResponseListener<SignedBlobSidecar> listener)
throws RpcException {
throw new UnsupportedOperationException("Not yet implemented");
}

@Override
public SafeFuture<Optional<SignedBeaconBlock>> requestBlockBySlot(final UInt64 slot) {
final Eth2RpcMethod<BeaconBlocksByRangeRequestMessage, SignedBeaconBlock> blocksByRange =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage;
Expand Down Expand Up @@ -91,6 +92,10 @@ SafeFuture<Void> requestBlockAndBlobsSidecarByRoot(
List<Bytes32> blockRoots, RpcResponseListener<SignedBeaconBlockAndBlobsSidecar> listener)
throws RpcException;

SafeFuture<Void> requestBlobSidecarsByRoot(
List<Bytes32> blockRoots, RpcResponseListener<SignedBlobSidecar> listener)
throws RpcException;

SafeFuture<Optional<SignedBeaconBlock>> requestBlockBySlot(UInt64 slot);

SafeFuture<Optional<SignedBeaconBlock>> requestBlockByRoot(Bytes32 blockRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class BeaconChainMethodIds {
static final String STATUS = "/eth2/beacon_chain/req/status";
static final String GOODBYE = "/eth2/beacon_chain/req/goodbye";
static final String BEACON_BLOCKS_BY_ROOT = "/eth2/beacon_chain/req/beacon_blocks_by_root";
static final String BLOB_SIDECARS_BY_ROOT = "/eth2/beacon_chain/req/blob_sidecars_by_root";
static final String BEACON_BLOCKS_BY_RANGE = "/eth2/beacon_chain/req/beacon_blocks_by_range";
static final String BEACON_BLOCK_AND_BLOBS_SIDECAR_BY_ROOT =
"/eth2/beacon_chain/req/beacon_block_and_blobs_sidecar_by_root";
Expand All @@ -45,6 +46,11 @@ public static String getBeaconBlockAndBlobsSidecarByRoot(
return getMethodId(BEACON_BLOCK_AND_BLOBS_SIDECAR_BY_ROOT, version, encoding);
}

public static String getBlobSidecarsByRootMethodId(
final int version, final RpcEncoding encoding) {
return getMethodId(BLOB_SIDECARS_BY_ROOT, version, encoding);
}

public static String getBlobsSidecarsByRangeMethodId(
final int version, final RpcEncoding encoding) {
return getMethodId(BLOBS_SIDECARS_BY_RANGE, version, encoding);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static tech.pegasys.teku.spec.config.Constants.MAX_BLOCK_BY_RANGE_REQUEST_SIZE;
import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOBS_SIDECARS;
import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOB_SIDECARS;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -30,6 +31,7 @@
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlockAndBlobsSidecarByRootMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlocksByRangeMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlocksByRootMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobSidecarsByRootMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobsSidecarsByRangeMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.GoodbyeMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.MetadataMessageHandler;
Expand All @@ -46,15 +48,19 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlockAndBlobsSidecarByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlockAndBlobsSidecarByRootRequestMessage.BeaconBlockAndBlobsSidecarByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRangeRequestMessage.BeaconBlocksByRangeRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage.BeaconBlocksByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage.BlobSidecarsByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage.BlobsSidecarsByRangeRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage;
Expand All @@ -80,6 +86,8 @@ public class BeaconChainMethods {
beaconBlockAndBlobsSidecarByRoot;
private final Optional<Eth2RpcMethod<BlobsSidecarsByRangeRequestMessage, BlobsSidecar>>
blobsSidecarsByRange;
private final Optional<Eth2RpcMethod<BlobSidecarsByRootRequestMessage, SignedBlobSidecar>>
blobSidecarsByRoot;
private final Eth2RpcMethod<EmptyMessage, MetadataMessage> getMetadata;
private final Eth2RpcMethod<PingMessage, PingMessage> ping;

Expand All @@ -96,6 +104,8 @@ private BeaconChainMethods(
beaconBlockAndBlobsSidecarByRoot,
final Optional<Eth2RpcMethod<BlobsSidecarsByRangeRequestMessage, BlobsSidecar>>
blobsSidecarsByRange,
final Optional<Eth2RpcMethod<BlobSidecarsByRootRequestMessage, SignedBlobSidecar>>
blobSidecarsByRoot,
final Eth2RpcMethod<EmptyMessage, MetadataMessage> getMetadata,
final Eth2RpcMethod<PingMessage, PingMessage> ping) {
this.status = status;
Expand All @@ -104,11 +114,13 @@ private BeaconChainMethods(
this.beaconBlocksByRange = beaconBlocksByRange;
this.beaconBlockAndBlobsSidecarByRoot = beaconBlockAndBlobsSidecarByRoot;
this.blobsSidecarsByRange = blobsSidecarsByRange;
this.blobSidecarsByRoot = blobSidecarsByRoot;
this.getMetadata = getMetadata;
this.ping = ping;
this.allMethods =
new ArrayList<>(
List.of(status, goodBye, beaconBlocksByRoot, beaconBlocksByRange, getMetadata, ping));
blobSidecarsByRoot.ifPresent(allMethods::add);
beaconBlockAndBlobsSidecarByRoot().ifPresent(allMethods::add);
blobsSidecarsByRange.ifPresent(allMethods::add);
}
Expand Down Expand Up @@ -146,6 +158,8 @@ public static BeaconChainMethods create(
peerLookup,
rpcEncoding,
recentChainData),
createBlobSidecarsByRoot(
spec, asyncRunner, combinedChainDataClient, peerLookup, rpcEncoding, recentChainData),
createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding),
createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
}
Expand Down Expand Up @@ -302,6 +316,48 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
peerLookup));
}

private static Optional<Eth2RpcMethod<BlobSidecarsByRootRequestMessage, SignedBlobSidecar>>
createBlobSidecarsByRoot(
final Spec spec,
final AsyncRunner asyncRunner,
final CombinedChainDataClient combinedChainDataClient,
final PeerLookup peerLookup,
final RpcEncoding rpcEncoding,
final RecentChainData recentChainData) {
if (!spec.isMilestoneSupported(SpecMilestone.DENEB)) {
return Optional.empty();
}

final RpcContextCodec<Bytes4, SignedBlobSidecar> forkDigestContextCodec =
RpcContextCodec.forkDigest(
spec, recentChainData, ForkDigestPayloadContext.SIGNED_BLOB_SIDECAR);

final int maxBlobsPerBlock =
SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig())
.getMaxBlobsPerBlock();

final UInt64 maxRequestSize = MAX_REQUEST_BLOB_SIDECARS.times(maxBlobsPerBlock);

final BlobSidecarsByRootRequestMessageSchema requestType =
new BlobSidecarsByRootRequestMessageSchema(maxRequestSize);

final BlobSidecarsByRootMessageHandler blobSidecarsByRootHandler =
new BlobSidecarsByRootMessageHandler(
spec, getDenebForkEpoch(spec), combinedChainDataClient, maxRequestSize);

return Optional.of(
new SingleProtocolEth2RpcMethod<>(
asyncRunner,
BeaconChainMethodIds.BLOB_SIDECARS_BY_ROOT,
1,
rpcEncoding,
requestType,
true,
forkDigestContextCodec,
blobSidecarsByRootHandler,
peerLookup));
}

private static Optional<Eth2RpcMethod<BlobsSidecarsByRangeRequestMessage, BlobsSidecar>>
createBlobsSidecarsByRange(
final Spec spec,
Expand Down Expand Up @@ -462,6 +518,11 @@ public Eth2RpcMethod<BeaconBlocksByRangeRequestMessage, SignedBeaconBlock> beaco
return blobsSidecarsByRange;
}

public Optional<Eth2RpcMethod<BlobSidecarsByRootRequestMessage, SignedBlobSidecar>>
blobSidecarsByRoot() {
return blobSidecarsByRoot;
}

public Eth2RpcMethod<EmptyMessage, MetadataMessage> getMetadata() {
return getMetadata;
}
Expand Down
Loading

0 comments on commit c4d09c7

Please sign in to comment.