Skip to content

Commit

Permalink
make the request schema static
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Feb 25, 2023
1 parent d2a8644 commit e60436b
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@
import tech.pegasys.teku.infrastructure.ssz.impl.SszListImpl;
import tech.pegasys.teku.infrastructure.ssz.schema.impl.AbstractSszListSchema;
import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;

public class BlobSidecarsByRootRequestMessage extends SszListImpl<BlobIdentifier>
implements SszList<BlobIdentifier>, RpcRequest {

public static class BlobSidecarsByRootRequestMessageSchema
extends AbstractSszListSchema<BlobIdentifier, BlobSidecarsByRootRequestMessage> {

public BlobSidecarsByRootRequestMessageSchema(final SpecConfigDeneb specConfigDeneb) {
super(
BlobIdentifier.SSZ_SCHEMA,
MAX_REQUEST_BLOB_SIDECARS.times(specConfigDeneb.getMaxBlobsPerBlock()).longValue());
public BlobSidecarsByRootRequestMessageSchema() {
// account for future forks by handling up tp to 128 MAX_BLOBS_PER_BLOCK
super(BlobIdentifier.SSZ_SCHEMA, MAX_REQUEST_BLOB_SIDECARS.times(128).longValue());
}

@Override
Expand All @@ -40,10 +38,11 @@ public BlobSidecarsByRootRequestMessage createFromBackingNode(final TreeNode nod
}
}

public BlobSidecarsByRootRequestMessage(
final BlobSidecarsByRootRequestMessageSchema schema,
final List<BlobIdentifier> blobIdentifiers) {
super(schema, schema.createTreeFromElements(blobIdentifiers));
public static final BlobSidecarsByRootRequestMessageSchema SSZ_SCHEMA =
new BlobSidecarsByRootRequestMessageSchema();

public BlobSidecarsByRootRequestMessage(final List<BlobIdentifier> blobIdentifiers) {
super(SSZ_SCHEMA, SSZ_SCHEMA.createTreeFromElements(blobIdentifiers));
}

private BlobSidecarsByRootRequestMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecarSchema;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.ExecutionPayloadHeaderSchemaDeneb;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.ExecutionPayloadSchemaDeneb;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage.BlobSidecarsByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChangeSchema;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconStateSchema;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.deneb.BeaconStateDeneb;
Expand All @@ -62,7 +61,6 @@ public class SchemaDefinitionsDeneb extends SchemaDefinitionsCapella {

private final BlobSchema blobSchema;
private final BlobSidecarSchema blobSidecarSchema;
private final BlobSidecarsByRootRequestMessageSchema blobSidecarsByRootRequestMessageSchema;
private final BlobsSidecarSchema blobsSidecarSchema;
private final SignedBeaconBlockAndBlobsSidecarSchema signedBeaconBlockAndBlobsSidecarSchema;
private final SignedBlobSidecarSchema signedBlobSidecarSchema;
Expand Down Expand Up @@ -103,8 +101,6 @@ public SchemaDefinitionsDeneb(final SpecConfigDeneb specConfig) {
this.blobSchema = new BlobSchema(specConfig);
this.blobsSidecarSchema = BlobsSidecarSchema.create(specConfig, blobSchema);
this.blobSidecarSchema = BlobSidecarSchema.create(blobSchema);
this.blobSidecarsByRootRequestMessageSchema =
new BlobSidecarsByRootRequestMessageSchema(specConfig);
this.signedBeaconBlockAndBlobsSidecarSchema =
SignedBeaconBlockAndBlobsSidecarSchema.create(signedBeaconBlockSchema, blobsSidecarSchema);
this.signedBlobSidecarSchema = SignedBlobSidecarSchema.create(blobSidecarSchema);
Expand Down Expand Up @@ -187,10 +183,6 @@ public BlobSidecarSchema getBlobSidecarSchema() {
return blobSidecarSchema;
}

public BlobSidecarsByRootRequestMessageSchema getBlobSidecarsByRootRequestMessageSchema() {
return blobSidecarsByRootRequestMessageSchema;
}

public SignedBeaconBlockAndBlobsSidecarSchema getSignedBeaconBlockAndBlobsSidecarSchema() {
return signedBeaconBlockAndBlobsSidecarSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage.BlobSidecarsByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.GoodbyeMessage;
Expand All @@ -66,7 +65,6 @@
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.StatusMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb;

class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
private static final Logger LOG = LogManager.getLogger();
Expand All @@ -86,8 +84,6 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
private final RateTracker blobsSidecarsRequestTracker;
private final RateTracker requestTracker;
private final Supplier<UInt64> firstSlotSupportingBlobsSidecarsByRange;
private final Supplier<BlobSidecarsByRootRequestMessageSchema>
blobSidecarsByRootRequestMessageSchema;

DefaultEth2Peer(
final Spec spec,
Expand Down Expand Up @@ -115,12 +111,6 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
.getDenebForkEpoch();
return spec.computeStartSlotAtEpoch(denebForkEpoch);
});
this.blobSidecarsByRootRequestMessageSchema =
Suppliers.memoize(
() ->
SchemaDefinitionsDeneb.required(
spec.forMilestone(SpecMilestone.DENEB).getSchemaDefinitions())
.getBlobSidecarsByRootRequestMessageSchema());
}

@Override
Expand Down Expand Up @@ -257,25 +247,14 @@ public SafeFuture<Void> requestBlockAndBlobsSidecarByRoot(

@Override
public SafeFuture<Void> requestBlobSidecarsByRoot(
final List<BlobIdentifier> blobIdentifiers, final RpcResponseListener<BlobSidecar> listener)
throws RpcException {
final Optional<Eth2RpcMethod<BlobSidecarsByRootRequestMessage, BlobSidecar>> rpcMethod =
rpcMethods.blobSidecarsByRoot();
if (rpcMethod.isEmpty()) {
return failWithUnsupportedMethodException("BlobSidecarsByRoot");
}
final BlobSidecarsByRootRequestMessageSchema requestSchema =
blobSidecarsByRootRequestMessageSchema.get();
final long requestMaxLength = requestSchema.getMaxLength();
if (blobIdentifiers.size() > requestMaxLength) {
throw new RpcException(
INVALID_REQUEST_CODE,
"Only a maximum of " + requestMaxLength + " blob sidecars per request");
}
return requestStream(
rpcMethod.get(),
new BlobSidecarsByRootRequestMessage(requestSchema, blobIdentifiers),
listener);
final List<BlobIdentifier> blobIdentifiers, final RpcResponseListener<BlobSidecar> listener) {
return rpcMethods
.blobSidecarsByRoot()
.map(
method ->
requestStream(
method, new BlobSidecarsByRootRequestMessage(blobIdentifiers), listener))
.orElse(failWithUnsupportedMethodException("BlobSidecarsByRoot"));
}

@Override
Expand Down Expand Up @@ -305,7 +284,6 @@ public SafeFuture<Optional<BlobSidecar>> requestBlobSidecarByRoot(
requestOptionalItem(
method,
new BlobSidecarsByRootRequestMessage(
blobSidecarsByRootRequestMessageSchema.get(),
Collections.singletonList(blobIdentifier))))
.orElse(failWithUnsupportedMethodException("BlobSidecarsByRoot"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BeaconBlocksByRootRequestMessage.BeaconBlocksByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage.BlobSidecarsByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobsSidecarsByRangeRequestMessage.BlobsSidecarsByRangeRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage;
Expand All @@ -67,7 +66,6 @@
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.PingMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.StatusMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
import tech.pegasys.teku.storage.client.RecentChainData;

Expand Down Expand Up @@ -337,11 +335,6 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
final RpcContextCodec<Bytes4, BlobSidecar> forkDigestContextCodec =
RpcContextCodec.forkDigest(spec, recentChainData, ForkDigestPayloadContext.BLOB_SIDECAR);

final BlobSidecarsByRootRequestMessageSchema requestType =
SchemaDefinitionsDeneb.required(
spec.forMilestone(SpecMilestone.DENEB).getSchemaDefinitions())
.getBlobSidecarsByRootRequestMessageSchema();

final BlobSidecarsByRootMessageHandler blobSidecarsByRootHandler =
new BlobSidecarsByRootMessageHandler(
spec, metricsSystem, getDenebForkEpoch(spec), combinedChainDataClient);
Expand All @@ -352,7 +345,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
BeaconChainMethodIds.BLOB_SIDECARS_BY_ROOT,
1,
rpcEncoding,
requestType,
BlobSidecarsByRootRequestMessage.SSZ_SCHEMA,
true,
forkDigestContextCodec,
blobSidecarsByRootHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods;

import static tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseStatus.INVALID_REQUEST_CODE;
import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOB_SIDECARS;
import static tech.pegasys.teku.spec.config.Constants.MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;

import com.google.common.base.Throwables;
Expand All @@ -35,6 +36,7 @@
import tech.pegasys.teku.networking.eth2.rpc.core.RpcException.ResourceUnavailableException;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
Expand Down Expand Up @@ -79,6 +81,20 @@ public BlobSidecarsByRootMessageHandler(
"Total number of blob sidecars requested in accepted blob sidecars by root requests from peers");
}

@Override
public Optional<RpcException> validateRequest(
final String protocolId, final BlobSidecarsByRootRequestMessage request) {
final int maxRequestSize = calculateMaxRequestSize();
if (request.size() > maxRequestSize) {
requestCounter.labels("count_too_big").inc();
return Optional.of(
new RpcException(
INVALID_REQUEST_CODE,
String.format("Maximum of %d blob sidecars allowed per request", maxRequestSize)));
}
return Optional.empty();
}

@Override
public void onIncomingMessage(
final String protocolId,
Expand Down Expand Up @@ -125,6 +141,14 @@ public void onIncomingMessage(
future.finish(callback::completeSuccessfully, err -> handleError(callback, err));
}

// MAX_REQUEST_BLOB_SIDECARS * MAX_BLOBS_PER_BLOCK
private int calculateMaxRequestSize() {
final UInt64 currentEpoch = combinedChainDataClient.getCurrentEpoch();
final int maxBlobsPerBlock =
SpecConfigDeneb.required(spec.atEpoch(currentEpoch).getConfig()).getMaxBlobsPerBlock();
return MAX_REQUEST_BLOB_SIDECARS.times(maxBlobsPerBlock).intValue();
}

private UInt64 getFinalizedEpoch() {
return combinedChainDataClient
.getFinalizedBlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseStatus.INVALID_REQUEST_CODE;
import static tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseStatus.RESOURCE_UNAVAILABLE;
import static tech.pegasys.teku.spec.config.Constants.MAX_CHUNK_SIZE_BELLATRIX;
import static tech.pegasys.teku.spec.config.Constants.MAX_REQUEST_BLOB_SIDECARS;

import java.util.List;
import java.util.Optional;
Expand All @@ -45,11 +46,10 @@
import tech.pegasys.teku.networking.eth2.rpc.core.encodings.RpcEncoding;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage.BlobSidecarsByRootRequestMessageSchema;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;

Expand Down Expand Up @@ -81,10 +81,6 @@ public class BlobSidecarsByRootMessageHandlerTest {
private final CombinedChainDataClient combinedChainDataClient =
mock(CombinedChainDataClient.class);

private final BlobSidecarsByRootRequestMessageSchema requestSchema =
SchemaDefinitionsDeneb.required(spec.atEpoch(denebForkEpoch).getSchemaDefinitions())
.getBlobSidecarsByRootRequestMessageSchema();

private final Eth2Peer peer = mock(Eth2Peer.class);

private final StubMetricsSystem metricsSystem = new StubMetricsSystem();
Expand Down Expand Up @@ -118,13 +114,44 @@ public void setup() {
when(callback.respond(any())).thenReturn(SafeFuture.COMPLETE);
}

@Test
public void validateRequest_shouldNotAllowRequestLargerThanMaximumAllowed() {
final int maxRequestSize =
MAX_REQUEST_BLOB_SIDECARS
.times(
SpecConfigDeneb.required(spec.atEpoch(denebForkEpoch).getConfig())
.getMaxBlobsPerBlock())
.intValue();

final BlobSidecarsByRootRequestMessage request =
new BlobSidecarsByRootRequestMessage(
dataStructureUtil.randomBlobIdentifiers(maxRequestSize + 1));

final Optional<RpcException> result = handler.validateRequest(protocolId, request);

assertThat(result)
.hasValueSatisfying(
rpcException -> {
assertThat(rpcException.getResponseCode()).isEqualTo(INVALID_REQUEST_CODE);
assertThat(rpcException.getErrorMessageString())
.isEqualTo("Maximum of %d blob sidecars allowed per request", maxRequestSize);
});

final long countTooBigCount =
metricsSystem
.getCounter(TekuMetricCategory.NETWORK, "rpc_blob_sidecars_by_root_requests_total")
.getValue("count_too_big");

assertThat(countTooBigCount).isOne();
}

@Test
public void shouldNotSendBlobSidecarsIfPeerIsRateLimited() {

when(peer.wantToReceiveBlobSidecars(callback, 5)).thenReturn(false);

final BlobSidecarsByRootRequestMessage request =
createRequest(dataStructureUtil.randomBlobIdentifiers(5));
new BlobSidecarsByRootRequestMessage(dataStructureUtil.randomBlobIdentifiers(5));

handler.onIncomingMessage(protocolId, peer, request, callback);

Expand All @@ -147,7 +174,8 @@ public void shouldSendResourceUnavailableIfBlockForBlockRootIsNotAvailable() {
when(combinedChainDataClient.getBlockByBlockRoot(secondBlockRoot))
.thenReturn(SafeFuture.completedFuture(Optional.empty()));

handler.onIncomingMessage(protocolId, peer, createRequest(blobIdentifiers), callback);
handler.onIncomingMessage(
protocolId, peer, new BlobSidecarsByRootRequestMessage(blobIdentifiers), callback);

verify(callback, times(1)).respond(blobSidecarCaptor.capture());
verify(callback).completeWithErrorResponse(rpcExceptionCaptor.capture());
Expand Down Expand Up @@ -175,7 +203,8 @@ public void shouldSendResourceUnavailableIfBlockForBlockRootIsNotAvailable() {
SafeFuture.completedFuture(
Optional.of(dataStructureUtil.randomSignedBeaconBlock(UInt64.ONE))));

handler.onIncomingMessage(protocolId, peer, createRequest(blobIdentifiers), callback);
handler.onIncomingMessage(
protocolId, peer, new BlobSidecarsByRootRequestMessage(blobIdentifiers), callback);

verify(callback, never()).respond(any());
verify(callback).completeWithErrorResponse(rpcExceptionCaptor.capture());
Expand All @@ -200,7 +229,8 @@ public void shouldSendResourceUnavailableIfBlobSidecarIsNotAvailable() {
secondBlobIdentifier.getBlockRoot(), secondBlobIdentifier.getIndex()))
.thenReturn(SafeFuture.completedFuture(Optional.empty()));

handler.onIncomingMessage(protocolId, peer, createRequest(blobIdentifiers), callback);
handler.onIncomingMessage(
protocolId, peer, new BlobSidecarsByRootRequestMessage(blobIdentifiers), callback);

verify(callback, times(1)).respond(blobSidecarCaptor.capture());
verify(callback).completeWithErrorResponse(rpcExceptionCaptor.capture());
Expand All @@ -220,7 +250,8 @@ public void shouldSendResourceUnavailableIfBlobSidecarIsNotAvailable() {
public void shouldSendToPeerRequestedBlobSidecars() {
final List<BlobIdentifier> blobIdentifiers = dataStructureUtil.randomBlobIdentifiers(5);

handler.onIncomingMessage(protocolId, peer, createRequest(blobIdentifiers), callback);
handler.onIncomingMessage(
protocolId, peer, new BlobSidecarsByRootRequestMessage(blobIdentifiers), callback);

verify(callback, times(5)).respond(blobSidecarCaptor.capture());

Expand All @@ -238,9 +269,4 @@ public void shouldSendToPeerRequestedBlobSidecars() {

verify(callback).completeSuccessfully();
}

private BlobSidecarsByRootRequestMessage createRequest(
final List<BlobIdentifier> blobIdentifiers) {
return new BlobSidecarsByRootRequestMessage(requestSchema, blobIdentifiers);
}
}

0 comments on commit e60436b

Please sign in to comment.