Skip to content

Commit

Permalink
Throttle outgoing requests by both peer and protocol id (#8969)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Jan 9, 2025
1 parent c0a17cf commit 6cba4f6
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 65 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
- Remove delay when fetching blobs from the local EL on block arrival

### Bug Fixes
- Fix `--version` command output [#8960](https://github.com/Consensys/teku/issues/8960)
- Fix `--version` command output [#8960](https://github.com/Consensys/teku/issues/8960)
- Fix issue (introduced in `24.12.1`) with peer stability when the upperbound is set to a high number
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,5 @@ public interface Eth2RpcMethod<TRequest extends RpcRequest & SszData, TResponse

@Override
Eth2OutgoingRequestHandler<TRequest, TResponse> createOutgoingRequestHandler(
String protocolId,
final TRequest request,
Eth2RpcResponseHandler<TResponse, ?> responseHandler);
String protocolId, TRequest request, Eth2RpcResponseHandler<TResponse, ?> responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_OPEN_STREAMS_RATE_LIMIT;
import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT;
import static tech.pegasys.teku.spec.constants.NetworkConstants.MAX_CONCURRENT_REQUESTS;

import com.google.common.base.Preconditions;
import identify.pb.IdentifyOuterClass;
Expand Down Expand Up @@ -153,9 +152,7 @@ public P2PNetwork<Peer> build() {
}

protected List<? extends RpcHandler<?, ?, ?>> createRpcHandlers() {
return rpcMethods.stream()
.map(m -> new RpcHandler<>(asyncRunner, m, MAX_CONCURRENT_REQUESTS))
.toList();
return rpcMethods.stream().map(m -> new RpcHandler<>(asyncRunner, m)).toList();
}

protected LibP2PGossipNetwork createGossipNetwork() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.libp2p.core.PeerId;
import io.libp2p.core.crypto.PubKey;
import io.libp2p.protocol.Identify;
import io.libp2p.protocol.IdentifyController;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -28,6 +29,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.network.PeerAddress;
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
Expand All @@ -41,11 +43,12 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class LibP2PPeer implements Peer {
private static final Logger LOG = LogManager.getLogger();

private final Map<RpcMethod<?, ?, ?>, RpcHandler<?, ?, ?>> rpcHandlers;
private final Map<RpcMethod<?, ?, ?>, ThrottlingRpcHandler<?, ?, ?>> rpcHandlers;
private final ReputationManager reputationManager;
private final Function<PeerId, Double> peerScoreFunction;
private final Connection connection;
Expand All @@ -71,7 +74,8 @@ public LibP2PPeer(
final Function<PeerId, Double> peerScoreFunction) {
this.connection = connection;
this.rpcHandlers =
rpcHandlers.stream().collect(Collectors.toMap(RpcHandler::getRpcMethod, h -> h));
rpcHandlers.stream()
.collect(Collectors.toMap(RpcHandler::getRpcMethod, ThrottlingRpcHandler::new));
this.reputationManager = reputationManager;
this.peerScoreFunction = peerScoreFunction;
this.peerId = connection.secureSession().getRemoteId();
Expand Down Expand Up @@ -109,10 +113,6 @@ private PeerClientType getPeerTypeFromAgentString(final String agentVersion) {
return EnumUtils.getEnumIgnoreCase(PeerClientType.class, agent, PeerClientType.UNKNOWN);
}

public Optional<String> getMaybeAgentString() {
return maybeAgentString;
}

public PubKey getPubKey() {
return pubKey;
}
Expand Down Expand Up @@ -161,7 +161,7 @@ private SafeFuture<IdentifyOuterClass.Identify> getIdentify() {
.muxerSession()
.createStream(new Identify())
.getController()
.thenCompose(controller -> controller.id()))
.thenCompose(IdentifyController::id))
.exceptionallyCompose(
error -> {
LOG.debug("Failed to get peer identity", error);
Expand Down Expand Up @@ -208,8 +208,8 @@ SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final TRequest request,
final RespHandler responseHandler) {
@SuppressWarnings("unchecked")
RpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
(RpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
final ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
(ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
if (rpcHandler == null) {
throw new IllegalArgumentException(
"Unknown rpc method invoked: " + String.join(",", rpcMethod.getIds()));
Expand Down Expand Up @@ -240,4 +240,26 @@ public void adjustReputation(final ReputationAdjustment adjustment) {
disconnectCleanly(DisconnectReason.REMOTE_FAULT).ifExceptionGetsHereRaiseABug();
}
}

private static class ThrottlingRpcHandler<
TOutgoingHandler extends RpcRequestHandler,
TRequest,
TRespHandler extends RpcResponseHandler<?>> {

private final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate;

private final ThrottlingTaskQueue requestsQueue =
ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS);

private ThrottlingRpcHandler(
final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
this.delegate = delegate;
}

private SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
return requestsQueue.queueTask(
() -> delegate.sendRequest(connection, request, responseHandler));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.SafeFuture.Interruptor;
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler.Controller;
Expand All @@ -63,15 +62,12 @@ public class RpcHandler<

private final AsyncRunner asyncRunner;
private final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod;
private final ThrottlingTaskQueue concurrentRequestsQueue;

public RpcHandler(
final AsyncRunner asyncRunner,
final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod,
final int maxConcurrentRequests) {
final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod) {
this.asyncRunner = asyncRunner;
this.rpcMethod = rpcMethod;
concurrentRequestsQueue = ThrottlingTaskQueue.create(maxConcurrentRequests);
}

public RpcMethod<TOutgoingHandler, TRequest, TRespHandler> getRpcMethod() {
Expand All @@ -80,13 +76,6 @@ public RpcMethod<TOutgoingHandler, TRequest, TRespHandler> getRpcMethod() {

public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
return concurrentRequestsQueue.queueTask(
() -> sendRequestInternal(connection, request, responseHandler));
}

public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequestInternal(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {

final Bytes initialPayload;
try {
initialPayload = rpcMethod.encodeRequest(request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.p2p.libp2p;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.libp2p.core.Connection;
import io.libp2p.core.security.SecureChannel.Session;
import java.util.List;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.reputation.ReputationManager;
import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class LibP2PPeerTest {

private final Connection connection = mock(Connection.class);

@SuppressWarnings("unchecked")
private final RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcHandler =
mock(RpcHandler.class);

@SuppressWarnings("unchecked")
private final RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcMethod =
mock(RpcMethod.class);

private LibP2PPeer libP2PPeer;

@BeforeEach
public void init() {
when(rpcHandler.getRpcMethod()).thenReturn(rpcMethod);
final Session secureSession = mock(Session.class);
when(connection.secureSession()).thenReturn(secureSession);
when(connection.closeFuture()).thenReturn(new SafeFuture<>());
libP2PPeer =
new LibP2PPeer(connection, List.of(rpcHandler), ReputationManager.NOOP, peer -> 0.0);
}

@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
@Test
public void sendRequest_throttlesRequests() {

// fill the queue with incomplete futures
final List<SafeFuture<RpcStreamController<RpcRequestHandler>>> queuedFutures =
IntStream.range(0, NetworkConstants.MAX_CONCURRENT_REQUESTS)
.mapToObj(
__ -> {
final SafeFuture<RpcStreamController<RpcRequestHandler>> future =
new SafeFuture<>();
when(rpcHandler.sendRequest(connection, null, null)).thenReturn(future);
libP2PPeer.sendRequest(rpcMethod, null, null);
return future;
})
.toList();

when(rpcHandler.sendRequest(connection, null, null))
.thenReturn(SafeFuture.completedFuture(mock(RpcStreamController.class)));

final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledRequest =
libP2PPeer.sendRequest(rpcMethod, null, null);

// completed request should be throttled
assertThat(throttledRequest).isNotDone();

// empty the queue
queuedFutures.forEach(future -> future.complete(mock(RpcStreamController.class)));

// throttled request should have completed now
assertThat(throttledRequest).isDone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.libp2p.core.mux.StreamMuxer.Session;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import kotlin.Unit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -53,9 +52,8 @@ public class RpcHandlerTest {

StubAsyncRunner asyncRunner = new StubAsyncRunner();
RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<?>> rpcMethod = mock(RpcMethod.class);
int maxConcurrentRequests = 2;
RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<?>> rpcHandler =
new RpcHandler<>(asyncRunner, rpcMethod, maxConcurrentRequests);
new RpcHandler<>(asyncRunner, rpcMethod);

Connection connection = mock(Connection.class);
Session session = mock(Session.class);
Expand Down Expand Up @@ -249,39 +247,6 @@ void sendRequest_interruptBeforeInitialPayloadWritten(
verify(stream).close();
}

@Test
@SuppressWarnings("FutureReturnValueIgnored")
void requestIsThrottledIfQueueIsFull() {
// fill the queue
IntStream.range(0, maxConcurrentRequests)
.forEach(__ -> rpcHandler.sendRequest(connection, request, responseHandler));

final StreamPromise<Controller<RpcRequestHandler>> streamPromise1 =
new StreamPromise<>(new CompletableFuture<>(), new CompletableFuture<>());
when(session.createStream((ProtocolBinding<Controller<RpcRequestHandler>>) any()))
.thenReturn(streamPromise1);
final Stream stream1 = mock(Stream.class);
streamPromise1.getStream().complete(stream1);
streamPromise1.getController().complete(controller);
final CompletableFuture<String> protocolIdFuture1 = new CompletableFuture<>();
when(stream1.getProtocol()).thenReturn(protocolIdFuture1);
protocolIdFuture1.complete("test");

final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledResult =
rpcHandler.sendRequest(connection, request, responseHandler);

assertThat(throttledResult).isNotDone();

// empty the queue
streamPromise.getStream().complete(stream);
streamPromise.getController().complete(controller);
stream.getProtocol().complete("test");
writeFuture.complete(null);

// throttled request should have completed now
assertThat(throttledResult).isCompleted();
}

@SuppressWarnings("UnnecessaryAsync")
private Class<? extends Exception> executeInterrupts(
final boolean closeStream, final boolean exceedTimeout) {
Expand Down

0 comments on commit 6cba4f6

Please sign in to comment.