Skip to content
This repository has been archived by the owner on Aug 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #497 from atoulme/max_clients
Browse files Browse the repository at this point in the history
Allow to configure max clients of rlpx service
  • Loading branch information
atoulme authored Jan 29, 2023
2 parents 0257d99 + 0235f60 commit 8fa5090
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ConnectToAnotherNodeTest {
@Disabled
@Test
fun testCollectHeaders(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/mainnet.json").readAllBytes()
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/mainnet.json")!!.readAllBytes()
val genesisFile = GenesisFile.read(contents)
val genesisBlock = genesisFile.toBlock()

Expand Down Expand Up @@ -88,6 +88,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
10,
meter
)
service.start().await()
Expand Down Expand Up @@ -116,7 +117,7 @@ class ConnectToAnotherNodeTest {
@Disabled("flaky")
@Test
fun twoServers(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
val contents = EthHandlerTest::class.java.getResourceAsStream("/mainnet.json").readAllBytes()
val contents = EthHandlerTest::class.java.getResourceAsStream("/mainnet.json")!!.readAllBytes()
val genesisBlock = GenesisFile.read(contents).toBlock()

val repository = BlockchainRepository.init(
Expand Down Expand Up @@ -150,6 +151,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
10,
meter
)

Expand Down Expand Up @@ -185,6 +187,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
10,
meter
)
val result = AsyncCompletion.allOf(service.start(), service2.start()).then {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SendPendingTransactionsTest {

@Test
fun testSendPendingTransactions(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json").readAllBytes()
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json")!!.readAllBytes()
val genesisFile = GenesisFile.read(contents)
val genesisBlock = genesisFile.toBlock()

Expand Down Expand Up @@ -97,6 +97,7 @@ class SendPendingTransactionsTest {
)
),
"Tuweni Experiment 0.1",
10,
meter
)
service.start().await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SendDataToAnotherNodeTest {
ProxySubprotocol()
),
"Tuweni Experiment 0.1",
10,
meter
)

Expand All @@ -66,6 +67,7 @@ class SendDataToAnotherNodeTest {
service2kp,
listOf(ProxySubprotocol()),
"Tuweni Experiment 0.1",
10,
meter
)
val recorder = RecordingClientHandler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ class EthereumClient(
rlpxConfig.keyPair(),
listOf(ethSubprotocol, proxySubprotocol),
rlpxConfig.clientName(),
rlpxConfig.maxConnections(),
meter,
adapter
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp
RLPxServiceConfigurationImpl(
section,
sectionConfig.getString("clientName"),
sectionConfig.getInteger("maxConnections"),
sectionConfig.getInteger("port"),
sectionConfig.getString("networkInterface"),
sectionConfig.getInteger("advertisedPort"),
Expand Down Expand Up @@ -337,6 +338,12 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp
"Port to advertise in communications as the RLPx service port",
PropertyValidator.isValidPort()
)
rlpx.addInteger(
"maxConnections",
50,
"Maximum number of clients",
PropertyValidator.isGreaterOrEqual(1)
)
rlpx.addString("clientName", "Apache Tuweni", "Name of the Ethereum client", null)
rlpx.addString("repository", "default", "Name of the blockchain repository", null)
rlpx.addString("peerRepository", "default", "Peer repository to which records should go", null)
Expand Down Expand Up @@ -440,6 +447,7 @@ interface RLPxServiceConfiguration {
fun repository(): String
fun getName(): String
fun clientName(): String
fun maxConnections(): Int
fun peerRepository(): String
}

Expand Down Expand Up @@ -511,6 +519,7 @@ internal class PeerRepositoryConfigurationImpl(private val repoName: String, pri
internal data class RLPxServiceConfigurationImpl(
private val name: String,
val clientName: String,
val maxConnections: Int,
val port: Int,
val networkInterface: String,
val advertisedPort: Int,
Expand All @@ -534,6 +543,7 @@ internal data class RLPxServiceConfigurationImpl(
override fun getName(): String = name

override fun clientName(): String = clientName
override fun maxConnections(): Int = maxConnections

override fun peerRepository(): String = peerRepository
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class CrawlerApplication(
SECP256K1.KeyPair.random(),
listOf(ethHelloProtocol),
"Apache Tuweni network crawler",
50,
meter,
wireConnectionsRepository
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOther(@VertxInstance
kp,
Collections.singletonList(sp),
"Client 1",
10,
meter,
repository);
MemoryWireConnectionsRepository secondRepository = new MemoryWireConnectionsRepository();
Expand All @@ -137,6 +138,7 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOther(@VertxInstance
secondKp,
Collections.singletonList(secondSp),
"Client 2",
10,
meter,
secondRepository);
service.start().join();
Expand Down Expand Up @@ -180,6 +182,7 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOtherSimultaneously(
kp,
Collections.singletonList(sp),
"Client 1",
10,
meter,
repository);
VertxRLPxService secondService = new VertxRLPxService(
Expand All @@ -190,6 +193,7 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOtherSimultaneously(
secondKp,
Collections.singletonList(secondSp),
"Client 2",
10,
meter,
secondRepository);
service.start().join();
Expand Down Expand Up @@ -282,7 +286,7 @@ public SubProtocolHandler createHandler(RLPxService service, SubProtocolClient c
public SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier identifier) {
return null;
}
}), "Client 1", meter, repository);
}), "Client 1", 10, meter, repository);
service.start().join();

AsyncResult<WireConnection> completion = service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ void invalidPort(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
10,
meter));
}

Expand All @@ -80,6 +81,7 @@ void invalidAdvertisedPort(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
10,
meter));
}

Expand All @@ -95,6 +97,23 @@ void invalidClientId(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
null,
10,
meter));
}

@Test
void invalidMaxConnections(@VertxInstance Vertx vertx) {
assertThrows(
IllegalArgumentException.class,
() -> new VertxRLPxService(
vertx,
34,
"localhost",
23,
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"foo",
-1,
meter));
}

Expand All @@ -110,6 +129,7 @@ void invalidClientIdSpaces(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
" ",
10,
meter));
}

Expand All @@ -123,6 +143,7 @@ void startAndStopService(@VertxInstance Vertx vertx) throws InterruptedException
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
10,
meter);

service.start().join();
Expand All @@ -136,7 +157,7 @@ void startAndStopService(@VertxInstance Vertx vertx) throws InterruptedException
@Test
void startServiceWithPortZero(@VertxInstance Vertx vertx) throws InterruptedException {
VertxRLPxService service =
new VertxRLPxService(vertx, 0, "localhost", 0, SECP256K1.KeyPair.random(), new ArrayList<>(), "a", meter);
new VertxRLPxService(vertx, 0, "localhost", 0, SECP256K1.KeyPair.random(), new ArrayList<>(), "a", 10, meter);

service.start().join();
try {
Expand All @@ -149,8 +170,16 @@ void startServiceWithPortZero(@VertxInstance Vertx vertx) throws InterruptedExce

@Test
void stopServiceWithoutStartingItFirst(@VertxInstance Vertx vertx) {
VertxRLPxService service =
new VertxRLPxService(vertx, 0, "localhost", 10000, SECP256K1.KeyPair.random(), new ArrayList<>(), "abc", meter);
VertxRLPxService service = new VertxRLPxService(
vertx,
0,
"localhost",
10000,
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"abc",
10,
meter);
AsyncCompletion completion = service.stop();
assertTrue(completion.isDone());
}
Expand All @@ -160,11 +189,11 @@ void connectToOtherPeerWithNoSubProtocols(@VertxInstance Vertx vertx) throws Exc
SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
VertxRLPxService service =
new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", meter);
new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", 10, meter);
service.start().join();

VertxRLPxService peerService =
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", meter);
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", 10, meter);
peerService.start().join();

WireConnection conn =
Expand All @@ -179,12 +208,12 @@ void disconnectAfterStop(@VertxInstance Vertx vertx) throws Exception {
SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
List<SubProtocol> protocols = Arrays.asList(new VertxAcceptanceTest.MyCustomSubProtocol());
VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", meter);
VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", 10, meter);
service.start().join();


VertxRLPxService peerService =
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", meter);
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", 10, meter);
peerService.start().join();

WireConnection conn = null;
Expand Down Expand Up @@ -234,12 +263,12 @@ public SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier
}
});
VertxRLPxService service =
new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", meter, repository);
new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", 10, meter, repository);
service.start().join();

MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
VertxRLPxService peerService =
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", meter, peerRepository);
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", 10, meter, peerRepository);
peerService.start().join();

try {
Expand All @@ -263,8 +292,17 @@ public SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier
void getClientWhenNotReady(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
VertxRLPxService peerService =
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", meter, peerRepository);
VertxRLPxService peerService = new VertxRLPxService(
vertx,
0,
"localhost",
10000,
peerPair,
new ArrayList<>(),
"abc",
10,
meter,
peerRepository);
assertThrows(IllegalStateException.class, () -> {
peerService.getClient(SubProtocolIdentifier.of("foo", 1));
});
Expand All @@ -286,6 +324,7 @@ void getClientWeCreate(@VertxInstance Vertx vertx) throws Exception {
SECP256K1.KeyPair.random(),
Collections.singletonList(sp),
"abc",
10,
meter,
peerRepository);
peerService.start().join();
Expand Down
Loading

0 comments on commit 8fa5090

Please sign in to comment.