Skip to content
This repository has been archived by the owner on Aug 1, 2023. It is now read-only.

Allow to configure max clients of rlpx service #497

Merged
merged 1 commit into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ConnectToAnotherNodeTest {
@Disabled
@Test
fun testCollectHeaders(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/mainnet.json").readAllBytes()
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/mainnet.json")!!.readAllBytes()
val genesisFile = GenesisFile.read(contents)
val genesisBlock = genesisFile.toBlock()

Expand Down Expand Up @@ -88,6 +88,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
10,
meter
)
service.start().await()
Expand Down Expand Up @@ -116,7 +117,7 @@ class ConnectToAnotherNodeTest {
@Disabled("flaky")
@Test
fun twoServers(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
val contents = EthHandlerTest::class.java.getResourceAsStream("/mainnet.json").readAllBytes()
val contents = EthHandlerTest::class.java.getResourceAsStream("/mainnet.json")!!.readAllBytes()
val genesisBlock = GenesisFile.read(contents).toBlock()

val repository = BlockchainRepository.init(
Expand Down Expand Up @@ -150,6 +151,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
10,
meter
)

Expand Down Expand Up @@ -185,6 +187,7 @@ class ConnectToAnotherNodeTest {
)
),
"Tuweni Experiment 0.1",
10,
meter
)
val result = AsyncCompletion.allOf(service.start(), service2.start()).then {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SendPendingTransactionsTest {

@Test
fun testSendPendingTransactions(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json").readAllBytes()
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json")!!.readAllBytes()
val genesisFile = GenesisFile.read(contents)
val genesisBlock = genesisFile.toBlock()

Expand Down Expand Up @@ -97,6 +97,7 @@ class SendPendingTransactionsTest {
)
),
"Tuweni Experiment 0.1",
10,
meter
)
service.start().await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SendDataToAnotherNodeTest {
ProxySubprotocol()
),
"Tuweni Experiment 0.1",
10,
meter
)

Expand All @@ -66,6 +67,7 @@ class SendDataToAnotherNodeTest {
service2kp,
listOf(ProxySubprotocol()),
"Tuweni Experiment 0.1",
10,
meter
)
val recorder = RecordingClientHandler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ class EthereumClient(
rlpxConfig.keyPair(),
listOf(ethSubprotocol, proxySubprotocol),
rlpxConfig.clientName(),
rlpxConfig.maxConnections(),
meter,
adapter
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp
RLPxServiceConfigurationImpl(
section,
sectionConfig.getString("clientName"),
sectionConfig.getInteger("maxConnections"),
sectionConfig.getInteger("port"),
sectionConfig.getString("networkInterface"),
sectionConfig.getInteger("advertisedPort"),
Expand Down Expand Up @@ -337,6 +338,12 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp
"Port to advertise in communications as the RLPx service port",
PropertyValidator.isValidPort()
)
rlpx.addInteger(
"maxConnections",
50,
"Maximum number of clients",
PropertyValidator.isGreaterOrEqual(1)
)
rlpx.addString("clientName", "Apache Tuweni", "Name of the Ethereum client", null)
rlpx.addString("repository", "default", "Name of the blockchain repository", null)
rlpx.addString("peerRepository", "default", "Peer repository to which records should go", null)
Expand Down Expand Up @@ -430,6 +437,7 @@ interface RLPxServiceConfiguration {
fun repository(): String
fun getName(): String
fun clientName(): String
fun maxConnections(): Int
fun peerRepository(): String
}

Expand Down Expand Up @@ -501,6 +509,7 @@ internal class PeerRepositoryConfigurationImpl(private val repoName: String, pri
internal data class RLPxServiceConfigurationImpl(
private val name: String,
val clientName: String,
val maxConnections: Int,
val port: Int,
val networkInterface: String,
val advertisedPort: Int,
Expand All @@ -524,6 +533,7 @@ internal data class RLPxServiceConfigurationImpl(
override fun getName(): String = name

override fun clientName(): String = clientName
override fun maxConnections(): Int = maxConnections

override fun peerRepository(): String = peerRepository
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class CrawlerApplication(
SECP256K1.KeyPair.random(),
listOf(ethHelloProtocol),
"Apache Tuweni network crawler",
50,
meter,
wireConnectionsRepository
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOther(@VertxInstance
kp,
Collections.singletonList(sp),
"Client 1",
10,
meter,
repository);
MemoryWireConnectionsRepository secondRepository = new MemoryWireConnectionsRepository();
Expand All @@ -137,6 +138,7 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOther(@VertxInstance
secondKp,
Collections.singletonList(secondSp),
"Client 2",
10,
meter,
secondRepository);
service.start().join();
Expand Down Expand Up @@ -180,6 +182,7 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOtherSimultaneously(
kp,
Collections.singletonList(sp),
"Client 1",
10,
meter,
repository);
VertxRLPxService secondService = new VertxRLPxService(
Expand All @@ -190,6 +193,7 @@ void testTwoServicesSendingMessagesOfCustomSubProtocolToEachOtherSimultaneously(
secondKp,
Collections.singletonList(secondSp),
"Client 2",
10,
meter,
secondRepository);
service.start().join();
Expand Down Expand Up @@ -282,7 +286,7 @@ public SubProtocolHandler createHandler(RLPxService service, SubProtocolClient c
public SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier identifier) {
return null;
}
}), "Client 1", meter, repository);
}), "Client 1", 10, meter, repository);
service.start().join();

AsyncResult<WireConnection> completion = service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ void invalidPort(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
10,
meter));
}

Expand All @@ -80,6 +81,7 @@ void invalidAdvertisedPort(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
10,
meter));
}

Expand All @@ -95,6 +97,23 @@ void invalidClientId(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
null,
10,
meter));
}

@Test
void invalidMaxConnections(@VertxInstance Vertx vertx) {
assertThrows(
IllegalArgumentException.class,
() -> new VertxRLPxService(
vertx,
34,
"localhost",
23,
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"foo",
-1,
meter));
}

Expand All @@ -110,6 +129,7 @@ void invalidClientIdSpaces(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair.random(),
new ArrayList<>(),
" ",
10,
meter));
}

Expand All @@ -123,6 +143,7 @@ void startAndStopService(@VertxInstance Vertx vertx) throws InterruptedException
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"a",
10,
meter);

service.start().join();
Expand All @@ -136,7 +157,7 @@ void startAndStopService(@VertxInstance Vertx vertx) throws InterruptedException
@Test
void startServiceWithPortZero(@VertxInstance Vertx vertx) throws InterruptedException {
VertxRLPxService service =
new VertxRLPxService(vertx, 0, "localhost", 0, SECP256K1.KeyPair.random(), new ArrayList<>(), "a", meter);
new VertxRLPxService(vertx, 0, "localhost", 0, SECP256K1.KeyPair.random(), new ArrayList<>(), "a", 10, meter);

service.start().join();
try {
Expand All @@ -149,8 +170,16 @@ void startServiceWithPortZero(@VertxInstance Vertx vertx) throws InterruptedExce

@Test
void stopServiceWithoutStartingItFirst(@VertxInstance Vertx vertx) {
VertxRLPxService service =
new VertxRLPxService(vertx, 0, "localhost", 10000, SECP256K1.KeyPair.random(), new ArrayList<>(), "abc", meter);
VertxRLPxService service = new VertxRLPxService(
vertx,
0,
"localhost",
10000,
SECP256K1.KeyPair.random(),
new ArrayList<>(),
"abc",
10,
meter);
AsyncCompletion completion = service.stop();
assertTrue(completion.isDone());
}
Expand All @@ -160,11 +189,11 @@ void connectToOtherPeerWithNoSubProtocols(@VertxInstance Vertx vertx) throws Exc
SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
VertxRLPxService service =
new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", meter);
new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, new ArrayList<>(), "abc", 10, meter);
service.start().join();

VertxRLPxService peerService =
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", meter);
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", 10, meter);
peerService.start().join();

WireConnection conn =
Expand All @@ -179,12 +208,12 @@ void disconnectAfterStop(@VertxInstance Vertx vertx) throws Exception {
SECP256K1.KeyPair ourPair = SECP256K1.KeyPair.random();
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
List<SubProtocol> protocols = Arrays.asList(new VertxAcceptanceTest.MyCustomSubProtocol());
VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", meter);
VertxRLPxService service = new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", 10, meter);
service.start().join();


VertxRLPxService peerService =
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", meter);
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", 10, meter);
peerService.start().join();

WireConnection conn = null;
Expand Down Expand Up @@ -234,12 +263,12 @@ public SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier
}
});
VertxRLPxService service =
new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", meter, repository);
new VertxRLPxService(vertx, 0, "localhost", 10000, ourPair, protocols, "abc", 10, meter, repository);
service.start().join();

MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
VertxRLPxService peerService =
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", meter, peerRepository);
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, protocols, "abc", 10, meter, peerRepository);
peerService.start().join();

try {
Expand All @@ -263,8 +292,17 @@ public SubProtocolClient createClient(RLPxService service, SubProtocolIdentifier
void getClientWhenNotReady(@VertxInstance Vertx vertx) {
SECP256K1.KeyPair peerPair = SECP256K1.KeyPair.random();
MemoryWireConnectionsRepository peerRepository = new MemoryWireConnectionsRepository();
VertxRLPxService peerService =
new VertxRLPxService(vertx, 0, "localhost", 10000, peerPair, new ArrayList<>(), "abc", meter, peerRepository);
VertxRLPxService peerService = new VertxRLPxService(
vertx,
0,
"localhost",
10000,
peerPair,
new ArrayList<>(),
"abc",
10,
meter,
peerRepository);
assertThrows(IllegalStateException.class, () -> {
peerService.getClient(SubProtocolIdentifier.of("foo", 1));
});
Expand All @@ -286,6 +324,7 @@ void getClientWeCreate(@VertxInstance Vertx vertx) throws Exception {
SECP256K1.KeyPair.random(),
Collections.singletonList(sp),
"abc",
10,
meter,
peerRepository);
peerService.start().join();
Expand Down
Loading