Skip to content

Commit

Permalink
6778 blob sidecars by range (Consensys#6835)
Browse files Browse the repository at this point in the history
* Add BlobSidecarsByRange RPC method
  • Loading branch information
mehdi-aouadi authored and rolfyone committed Mar 1, 2023
1 parent fbf6e11 commit 164a68b
Show file tree
Hide file tree
Showing 33 changed files with 1,060 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package tech.pegasys.teku.beacon.sync.forward.multipeer.chains;

import static tech.pegasys.teku.spec.config.Constants.MAX_BLOBS_SIDECARS_PER_MINUTE;
import static tech.pegasys.teku.spec.config.Constants.MAX_BLOB_SIDECARS_PER_MINUTE;
import static tech.pegasys.teku.spec.config.Constants.MAX_BLOCKS_PER_MINUTE;
import static tech.pegasys.teku.spec.config.Constants.SYNC_BATCH_SIZE;
import static tech.pegasys.teku.spec.config.Constants.SYNC_BLOBS_SIDECARS_SIZE;
import static tech.pegasys.teku.spec.config.Constants.SYNC_BLOB_SIDECARS_SIZE;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -41,11 +43,18 @@ public SyncSource getOrCreateSyncSource(final Eth2Peer peer) {
final int maxBlocksPerMinute = MAX_BLOCKS_PER_MINUTE - SYNC_BATCH_SIZE.intValue() - 1;
final int maxBlobsSidecarsPerMinute =
MAX_BLOBS_SIDECARS_PER_MINUTE - SYNC_BLOBS_SIDECARS_SIZE.intValue() - 1;
final int maxBlobSidecarsPerMinute =
MAX_BLOB_SIDECARS_PER_MINUTE - SYNC_BLOB_SIDECARS_SIZE.intValue() - 1;
return syncSourcesByPeer.computeIfAbsent(
peer,
source ->
new ThrottlingSyncSource(
asyncRunner, timeProvider, source, maxBlocksPerMinute, maxBlobsSidecarsPerMinute));
asyncRunner,
timeProvider,
source,
maxBlocksPerMinute,
maxBlobsSidecarsPerMinute,
maxBlobSidecarsPerMinute));
}

public void onPeerDisconnected(final Eth2Peer peer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,34 @@
import tech.pegasys.teku.networking.p2p.reputation.ReputationAdjustment;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar;

public class ThrottlingSyncSource implements SyncSource {
private static final Logger LOG = LogManager.getLogger();
private static final long TIME_OUT = 60;
public static final Duration PEER_REQUEST_DELAY = Duration.ofSeconds(3);
private final AsyncRunner asyncRunner;
private final SyncSource delegate;

private final RateTracker blocksRateTracker;
private final RateTracker blobsSidecarsRateTracker;
private final RateTracker blobSidecarsRateTracker;

public ThrottlingSyncSource(
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final SyncSource delegate,
final int maxBlocksPerMinute,
final int maxBlobsSidecarsPerMinute) {
final int maxBlobsSidecarsPerMinute,
final int maxBlobSidecarsPerMinute) {
this.asyncRunner = asyncRunner;
this.delegate = delegate;
this.blocksRateTracker = new RateTracker(maxBlocksPerMinute, 60, timeProvider);
this.blobsSidecarsRateTracker = new RateTracker(maxBlobsSidecarsPerMinute, 60, timeProvider);
this.blocksRateTracker = new RateTracker(maxBlocksPerMinute, TIME_OUT, timeProvider);
this.blobsSidecarsRateTracker =
new RateTracker(maxBlobsSidecarsPerMinute, TIME_OUT, timeProvider);
this.blobSidecarsRateTracker =
new RateTracker(maxBlobSidecarsPerMinute, TIME_OUT, timeProvider);
}

@Override
Expand All @@ -58,7 +66,7 @@ public SafeFuture<Void> requestBlocksByRange(
return delegate.requestBlocksByRange(startSlot, count, listener);
} else {
return asyncRunner.runAfterDelay(
() -> requestBlocksByRange(startSlot, count, listener), Duration.ofSeconds(3));
() -> requestBlocksByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
}
}

Expand All @@ -72,7 +80,19 @@ public SafeFuture<Void> requestBlobsSidecarsByRange(
return delegate.requestBlobsSidecarsByRange(startSlot, count, listener);
} else {
return asyncRunner.runAfterDelay(
() -> requestBlobsSidecarsByRange(startSlot, count, listener), Duration.ofSeconds(3));
() -> requestBlobsSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
}
}

@Override
public SafeFuture<Void> requestBlobSidecarsByRange(
final UInt64 startSlot, final UInt64 count, final RpcResponseListener<BlobSidecar> listener) {
if (blobSidecarsRateTracker.wantToRequestObjects(count.longValue()) > 0) {
LOG.debug("Sending request for {} blob sidecars", count);
return delegate.requestBlobSidecarsByRange(startSlot, count, listener);
} else {
return asyncRunner.runAfterDelay(
() -> requestBlobSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar;

class ThrottlingSyncSourceTest {

private static final int MAX_BLOCKS_PER_MINUTE = 100;
private static final int MAX_BLOBS_SIDECARS_PER_MINUTE = 100;
private static final int MAX_BLOB_SIDECARS_PER_MINUTE = 100;
private final StubAsyncRunner asyncRunner = new StubAsyncRunner();
private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0);
private final SyncSource delegate = mock(SyncSource.class);
Expand All @@ -47,13 +49,18 @@ class ThrottlingSyncSourceTest {
private final RpcResponseListener<BlobsSidecar> blobsSidecarsListener =
mock(RpcResponseListener.class);

@SuppressWarnings("unchecked")
private final RpcResponseListener<BlobSidecar> blobSidecarsListener =
mock(RpcResponseListener.class);

private final ThrottlingSyncSource source =
new ThrottlingSyncSource(
asyncRunner,
timeProvider,
delegate,
MAX_BLOCKS_PER_MINUTE,
MAX_BLOBS_SIDECARS_PER_MINUTE);
MAX_BLOBS_SIDECARS_PER_MINUTE,
MAX_BLOB_SIDECARS_PER_MINUTE);

@Test
void shouldDelegateDisconnectImmediately() {
Expand Down Expand Up @@ -93,6 +100,21 @@ void shouldRequestBlobsSidecarsImmediatelyIfRateLimitNotExceeded() {
.requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener));
}

@Test
void shouldRequestBlobSidecarsImmediatelyIfRateLimitNotExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE - 1);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));

// Both requests happen immediately
ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
}

@Test
void shouldDelayRequestIfBlockLimitAlreadyExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOCKS_PER_MINUTE);
Expand Down Expand Up @@ -129,6 +151,25 @@ void shouldDelayRequestIfBlobsSidecarsLimitAlreadyExceeded() {
.requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener));
}

@Test
void shouldDelayRequestIfBlobSidecarsLimitAlreadyExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));

ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(61);
asyncRunner.executeQueuedActions();

ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
}

@Test
void shouldContinueDelayingBlocksRequestIfRequestStillExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOCKS_PER_MINUTE);
Expand Down Expand Up @@ -170,4 +211,27 @@ void shouldContinueDelayingBlobsSidecarsRequestIfRequestStillExceeded() {
verify(delegate)
.requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener));
}

@Test
void shouldContinueDelayingBlobSidecarsRequestIfRequestStillExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));

// Both requests happen immediately
ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(30);
asyncRunner.executeQueuedActions();
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(31);
asyncRunner.executeQueuedActions();
ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class Constants {
public static final UInt64 MAX_REQUEST_BLOB_SIDECARS = UInt64.valueOf(128);

public static final int MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS = 4096;
public static final int MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS = 4096;

// Teku Networking Specific
public static final int VALID_BLOCK_SET_SIZE = 1000;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class Constants {
public static final UInt64 SYNC_BLOB_SIDECARS_SIZE = UInt64.valueOf(50);
public static final int MAX_BLOCKS_PER_MINUTE = 500;
public static final int MAX_BLOBS_SIDECARS_PER_MINUTE = 500;
public static final int MAX_BLOB_SIDECARS_PER_MINUTE = 1000;

// Teku Validator Client Specific
public static final Duration GENESIS_DATA_RETRY_DELAY = Duration.ofSeconds(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private BeaconBlockAndBlobsSidecarByRootRequestMessage(final TreeNode node) {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public UInt64 getMaxSlot() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return Math.toIntExact(getCount().longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private BeaconBlocksByRootRequestMessage(TreeNode node) {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return size();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright ConsenSys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc;

import tech.pegasys.teku.infrastructure.ssz.containers.Container2;
import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2;
import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64;
import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas;
import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class BlobSidecarsByRangeRequestMessage
extends Container2<BlobSidecarsByRangeRequestMessage, SszUInt64, SszUInt64>
implements RpcRequest {

public static class BlobSidecarsByRangeRequestMessageSchema
extends ContainerSchema2<BlobSidecarsByRangeRequestMessage, SszUInt64, SszUInt64> {

public BlobSidecarsByRangeRequestMessageSchema() {
super(
"BlobSidecarsByRangeRequestMessage",
namedSchema("start_slot", SszPrimitiveSchemas.UINT64_SCHEMA),
namedSchema("count", SszPrimitiveSchemas.UINT64_SCHEMA));
}

@Override
public BlobSidecarsByRangeRequestMessage createFromBackingNode(final TreeNode node) {
return new BlobSidecarsByRangeRequestMessage(this, node);
}
}

public static final BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema
SSZ_SCHEMA = new BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema();

private BlobSidecarsByRangeRequestMessage(
final BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema type,
final TreeNode backingNode) {
super(type, backingNode);
}

public BlobSidecarsByRangeRequestMessage(final UInt64 startSlot, final UInt64 count) {
super(SSZ_SCHEMA, SszUInt64.of(startSlot), SszUInt64.of(count));
}

public UInt64 getStartSlot() {
return getField0().get();
}

public UInt64 getCount() {
return getField1().get();
}

public UInt64 getMaxSlot() {
return getStartSlot().plus(getCount()).minusMinZero(1);
}

@Override
public int getMaximumResponseChunks() {
return getCount().intValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private BlobSidecarsByRootRequestMessage(
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public UInt64 getCount() {
}

public UInt64 getMaxSlot() {
return getStartSlot().plus(getCount().minus(1));
return getStartSlot().plus(getCount()).minusMinZero(1);
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return getCount().intValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String toString() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public UInt64 getReason() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public UInt64 getSeqNumber() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

public interface RpcRequest extends SszData {

int getMaximumRequestChunks();
int getMaximumResponseChunks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public UInt64 getHeadSlot() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return 1;
}
}
Loading

0 comments on commit 164a68b

Please sign in to comment.