Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6778 blob sidecars by range #6835

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6d80365
Add BlobSidecarsByRange RPC method
mehdi-aouadi Feb 17, 2023
5d60402
Add BlobSidecarsByRange tests
mehdi-aouadi Feb 17, 2023
4c372d7
Suppress warnings
mehdi-aouadi Feb 17, 2023
a111ae7
Update max request size
mehdi-aouadi Feb 20, 2023
c91bc7f
Handle multiple blobs per block
mehdi-aouadi Feb 20, 2023
7f19b11
Add request validation
mehdi-aouadi Feb 21, 2023
f6e3a68
Add constants
mehdi-aouadi Feb 22, 2023
1b5432a
Refactor chain builder
mehdi-aouadi Feb 22, 2023
7d4e3ac
Handle requested blobs count zero
mehdi-aouadi Feb 22, 2023
5adb5ee
remove request validation
mehdi-aouadi Feb 22, 2023
d385f18
remove unused method
mehdi-aouadi Feb 22, 2023
b940370
cleanup unused methods
mehdi-aouadi Feb 22, 2023
cd75523
remove unused variables
mehdi-aouadi Feb 22, 2023
dbe05d2
add slot and indices iterator tests
mehdi-aouadi Feb 22, 2023
e173788
fix imports
mehdi-aouadi Feb 22, 2023
1e8e492
refactor slot and indices iteration
mehdi-aouadi Feb 23, 2023
bbbc5d7
refactor getBlobSidecarByBlockRootAndIndex
mehdi-aouadi Feb 27, 2023
611aea9
rename supplier
mehdi-aouadi Feb 27, 2023
8117138
remove unused parent block root
mehdi-aouadi Feb 27, 2023
d197efd
rename getMaximumRequestChunks
mehdi-aouadi Feb 27, 2023
95d35c7
remove unused method
mehdi-aouadi Feb 27, 2023
9fc9c56
do no send request to peer when count is zero
mehdi-aouadi Feb 27, 2023
cb4f734
snake case
mehdi-aouadi Feb 27, 2023
1db1110
remove irrelevant comment
mehdi-aouadi Feb 27, 2023
f06aa43
ignore request when count is zero
mehdi-aouadi Feb 27, 2023
4980eb1
move peers requests throttling delay to constants
mehdi-aouadi Feb 27, 2023
1bf25a2
remove unnecessary test
mehdi-aouadi Feb 27, 2023
c1e69fe
use global duration
mehdi-aouadi Feb 27, 2023
e6437c2
add blob sidecars by range to the beacon chain method
mehdi-aouadi Feb 28, 2023
afdcef8
rebase fixes
mehdi-aouadi Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package tech.pegasys.teku.beacon.sync.forward.multipeer.chains;

import static tech.pegasys.teku.spec.config.Constants.MAX_BLOBS_SIDECARS_PER_MINUTE;
import static tech.pegasys.teku.spec.config.Constants.MAX_BLOB_SIDECARS_PER_MINUTE;
import static tech.pegasys.teku.spec.config.Constants.MAX_BLOCKS_PER_MINUTE;
import static tech.pegasys.teku.spec.config.Constants.SYNC_BATCH_SIZE;
import static tech.pegasys.teku.spec.config.Constants.SYNC_BLOBS_SIDECARS_SIZE;
import static tech.pegasys.teku.spec.config.Constants.SYNC_BLOB_SIDECARS_SIZE;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -41,11 +43,18 @@ public SyncSource getOrCreateSyncSource(final Eth2Peer peer) {
final int maxBlocksPerMinute = MAX_BLOCKS_PER_MINUTE - SYNC_BATCH_SIZE.intValue() - 1;
final int maxBlobsSidecarsPerMinute =
MAX_BLOBS_SIDECARS_PER_MINUTE - SYNC_BLOBS_SIDECARS_SIZE.intValue() - 1;
final int maxBlobSidecarsPerMinute =
MAX_BLOB_SIDECARS_PER_MINUTE - SYNC_BLOB_SIDECARS_SIZE.intValue() - 1;
return syncSourcesByPeer.computeIfAbsent(
peer,
source ->
new ThrottlingSyncSource(
asyncRunner, timeProvider, source, maxBlocksPerMinute, maxBlobsSidecarsPerMinute));
asyncRunner,
timeProvider,
source,
maxBlocksPerMinute,
maxBlobsSidecarsPerMinute,
maxBlobSidecarsPerMinute));
}

public void onPeerDisconnected(final Eth2Peer peer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,34 @@
import tech.pegasys.teku.networking.p2p.reputation.ReputationAdjustment;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar;

public class ThrottlingSyncSource implements SyncSource {
private static final Logger LOG = LogManager.getLogger();
private static final long TIME_OUT = 60;
public static final Duration PEER_REQUEST_DELAY = Duration.ofSeconds(3);
private final AsyncRunner asyncRunner;
private final SyncSource delegate;

private final RateTracker blocksRateTracker;
private final RateTracker blobsSidecarsRateTracker;
private final RateTracker blobSidecarsRateTracker;

public ThrottlingSyncSource(
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final SyncSource delegate,
final int maxBlocksPerMinute,
final int maxBlobsSidecarsPerMinute) {
final int maxBlobsSidecarsPerMinute,
final int maxBlobSidecarsPerMinute) {
this.asyncRunner = asyncRunner;
this.delegate = delegate;
this.blocksRateTracker = new RateTracker(maxBlocksPerMinute, 60, timeProvider);
this.blobsSidecarsRateTracker = new RateTracker(maxBlobsSidecarsPerMinute, 60, timeProvider);
this.blocksRateTracker = new RateTracker(maxBlocksPerMinute, TIME_OUT, timeProvider);
this.blobsSidecarsRateTracker =
new RateTracker(maxBlobsSidecarsPerMinute, TIME_OUT, timeProvider);
this.blobSidecarsRateTracker =
new RateTracker(maxBlobSidecarsPerMinute, TIME_OUT, timeProvider);
}

@Override
Expand All @@ -58,7 +66,7 @@ public SafeFuture<Void> requestBlocksByRange(
return delegate.requestBlocksByRange(startSlot, count, listener);
} else {
return asyncRunner.runAfterDelay(
() -> requestBlocksByRange(startSlot, count, listener), Duration.ofSeconds(3));
() -> requestBlocksByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
}
}

Expand All @@ -72,7 +80,19 @@ public SafeFuture<Void> requestBlobsSidecarsByRange(
return delegate.requestBlobsSidecarsByRange(startSlot, count, listener);
} else {
return asyncRunner.runAfterDelay(
() -> requestBlobsSidecarsByRange(startSlot, count, listener), Duration.ofSeconds(3));
() -> requestBlobsSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
}
}

@Override
public SafeFuture<Void> requestBlobSidecarsByRange(
final UInt64 startSlot, final UInt64 count, final RpcResponseListener<BlobSidecar> listener) {
if (blobSidecarsRateTracker.wantToRequestObjects(count.longValue()) > 0) {
LOG.debug("Sending request for {} blob sidecars", count);
return delegate.requestBlobSidecarsByRange(startSlot, count, listener);
} else {
return asyncRunner.runAfterDelay(
() -> requestBlobSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.execution.versions.deneb.BlobsSidecar;

class ThrottlingSyncSourceTest {

private static final int MAX_BLOCKS_PER_MINUTE = 100;
private static final int MAX_BLOBS_SIDECARS_PER_MINUTE = 100;
private static final int MAX_BLOB_SIDECARS_PER_MINUTE = 100;
private final StubAsyncRunner asyncRunner = new StubAsyncRunner();
private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0);
private final SyncSource delegate = mock(SyncSource.class);
Expand All @@ -47,13 +49,18 @@ class ThrottlingSyncSourceTest {
private final RpcResponseListener<BlobsSidecar> blobsSidecarsListener =
mock(RpcResponseListener.class);

@SuppressWarnings("unchecked")
private final RpcResponseListener<BlobSidecar> blobSidecarsListener =
mock(RpcResponseListener.class);

private final ThrottlingSyncSource source =
new ThrottlingSyncSource(
asyncRunner,
timeProvider,
delegate,
MAX_BLOCKS_PER_MINUTE,
MAX_BLOBS_SIDECARS_PER_MINUTE);
MAX_BLOBS_SIDECARS_PER_MINUTE,
MAX_BLOB_SIDECARS_PER_MINUTE);

@Test
void shouldDelegateDisconnectImmediately() {
Expand Down Expand Up @@ -93,6 +100,21 @@ void shouldRequestBlobsSidecarsImmediatelyIfRateLimitNotExceeded() {
.requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener));
}

@Test
void shouldRequestBlobSidecarsImmediatelyIfRateLimitNotExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE - 1);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));

// Both requests happen immediately
ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
}

@Test
void shouldDelayRequestIfBlockLimitAlreadyExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOCKS_PER_MINUTE);
Expand Down Expand Up @@ -129,6 +151,25 @@ void shouldDelayRequestIfBlobsSidecarsLimitAlreadyExceeded() {
.requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener));
}

@Test
void shouldDelayRequestIfBlobSidecarsLimitAlreadyExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));

ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(61);
asyncRunner.executeQueuedActions();

ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
}

@Test
void shouldContinueDelayingBlocksRequestIfRequestStillExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOCKS_PER_MINUTE);
Expand Down Expand Up @@ -170,4 +211,27 @@ void shouldContinueDelayingBlobsSidecarsRequestIfRequestStillExceeded() {
verify(delegate)
.requestBlobsSidecarsByRange(UInt64.valueOf(100), count, blobsSidecarsListener));
}

@Test
void shouldContinueDelayingBlobSidecarsRequestIfRequestStillExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));

// Both requests happen immediately
ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(30);
asyncRunner.executeQueuedActions();
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(31);
asyncRunner.executeQueuedActions();
ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class Constants {
public static final UInt64 MAX_REQUEST_BLOB_SIDECARS = UInt64.valueOf(128);

public static final int MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS = 4096;
public static final int MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS = 4096;

// Teku Networking Specific
public static final int VALID_BLOCK_SET_SIZE = 1000;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class Constants {
public static final UInt64 SYNC_BLOB_SIDECARS_SIZE = UInt64.valueOf(50);
public static final int MAX_BLOCKS_PER_MINUTE = 500;
public static final int MAX_BLOBS_SIDECARS_PER_MINUTE = 500;
public static final int MAX_BLOB_SIDECARS_PER_MINUTE = 1000;

// Teku Validator Client Specific
public static final Duration GENESIS_DATA_RETRY_DELAY = Duration.ofSeconds(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private BeaconBlockAndBlobsSidecarByRootRequestMessage(final TreeNode node) {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public UInt64 getMaxSlot() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return Math.toIntExact(getCount().longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private BeaconBlocksByRootRequestMessage(TreeNode node) {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return size();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright ConsenSys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc;

import tech.pegasys.teku.infrastructure.ssz.containers.Container2;
import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2;
import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64;
import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas;
import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class BlobSidecarsByRangeRequestMessage
extends Container2<BlobSidecarsByRangeRequestMessage, SszUInt64, SszUInt64>
implements RpcRequest {

public static class BlobSidecarsByRangeRequestMessageSchema
extends ContainerSchema2<BlobSidecarsByRangeRequestMessage, SszUInt64, SszUInt64> {

public BlobSidecarsByRangeRequestMessageSchema() {
super(
"BlobSidecarsByRangeRequestMessage",
namedSchema("start_slot", SszPrimitiveSchemas.UINT64_SCHEMA),
namedSchema("count", SszPrimitiveSchemas.UINT64_SCHEMA));
}

@Override
public BlobSidecarsByRangeRequestMessage createFromBackingNode(final TreeNode node) {
return new BlobSidecarsByRangeRequestMessage(this, node);
}
}

public static final BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema
SSZ_SCHEMA = new BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema();

private BlobSidecarsByRangeRequestMessage(
final BlobSidecarsByRangeRequestMessage.BlobSidecarsByRangeRequestMessageSchema type,
final TreeNode backingNode) {
super(type, backingNode);
}

public BlobSidecarsByRangeRequestMessage(final UInt64 startSlot, final UInt64 count) {
super(SSZ_SCHEMA, SszUInt64.of(startSlot), SszUInt64.of(count));
}

public UInt64 getStartSlot() {
return getField0().get();
}

public UInt64 getCount() {
return getField1().get();
}

public UInt64 getMaxSlot() {
return getStartSlot().plus(getCount()).minusMinZero(1);
}

@Override
public int getMaximumResponseChunks() {
return getCount().intValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private BlobSidecarsByRootRequestMessage(
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public UInt64 getCount() {
}

public UInt64 getMaxSlot() {
return getStartSlot().plus(getCount().minus(1));
return getStartSlot().plus(getCount()).minusMinZero(1);
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return getCount().intValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String toString() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public UInt64 getReason() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public UInt64 getSeqNumber() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

public interface RpcRequest extends SszData {

int getMaximumRequestChunks();
int getMaximumResponseChunks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public UInt64 getHeadSlot() {
}

@Override
public int getMaximumRequestChunks() {
public int getMaximumResponseChunks() {
return 1;
}
}
Loading