Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder #22541

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c8295b7
[improve][client] Add connectionsPerHost and connectionMaxIdleSeconds…
lhotari Apr 19, 2024
8605273
Rename connectionsPerHost -> maxConnectionsPerHost
lhotari Apr 22, 2024
c0c01c2
Add test for specifying maxConnectionsPerHost with loadConf
lhotari Apr 22, 2024
f5e5eec
Upgrade Jersey to 2.42
lhotari Aug 2, 2024
18eee3d
Add completable-futures dependency
lhotari Aug 5, 2024
e03bb1f
Use ConcurrencyReducer to limit concurrency per host
lhotari Aug 6, 2024
24dc470
Handle redirects so that the concurrency limiter could work with redi…
lhotari Aug 7, 2024
b2e420d
Add MaxRedirectException
lhotari Aug 7, 2024
f798673
Remove redundant code
lhotari Aug 7, 2024
1f007a5
Support access directly to AsyncHttpClient which limits concurrency
lhotari Aug 7, 2024
7b2b93a
Fix test after modifying connectionMaxIdleSeconds minimum
lhotari Aug 7, 2024
c7f39a0
Specify acquireFreeChannelTimeout when maxConnectionsPerHost is set
lhotari Aug 7, 2024
4a2c01f
Fix CodeQL java/netty-http-request-or-response-splitting issue
lhotari Aug 7, 2024
12dd71d
Refactor configuration logic in AsyncHttpConnector
lhotari Aug 8, 2024
370c7af
Add test for max redirects
lhotari Aug 8, 2024
3469372
Add tests for redirects
lhotari Aug 8, 2024
609629c
Add comment about retries in AsyncHttpConnector
lhotari Aug 8, 2024
6a18bbb
Fix implementation of possibly unused "apply" method
lhotari Aug 8, 2024
ec9e724
Add test for max connections
lhotari Aug 8, 2024
e7c961a
Add Javadocs to AsyncHttpRequestExecutor
lhotari Aug 8, 2024
fb374eb
Unwrap exception
lhotari Aug 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ The Apache Software License, Version 2.0
- io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java17-1.33.3-alpha.jar
- io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java8-1.33.3-alpha.jar
- io.opentelemetry.semconv-opentelemetry-semconv-1.25.0-alpha.jar
* Spotify completable-futures
- com.spotify-completable-futures-0.3.6.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down Expand Up @@ -580,15 +582,15 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
- org.glassfish.hk2-osgi-resource-locator-1.0.3.jar
- org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar
* Jersey
- org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar
- org.glassfish.jersey.core-jersey-client-2.41.jar
- org.glassfish.jersey.core-jersey-common-2.41.jar
- org.glassfish.jersey.core-jersey-server-2.41.jar
- org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar
- org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar
- org.glassfish.jersey.media-jersey-media-multipart-2.41.jar
- org.glassfish.jersey.inject-jersey-hk2-2.41.jar
- org.glassfish.jersey.containers-jersey-container-servlet-2.42.jar
- org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar
- org.glassfish.jersey.core-jersey-client-2.42.jar
- org.glassfish.jersey.core-jersey-common-2.42.jar
- org.glassfish.jersey.core-jersey-server-2.42.jar
- org.glassfish.jersey.ext-jersey-entity-filtering-2.42.jar
- org.glassfish.jersey.media-jersey-media-json-jackson-2.42.jar
- org.glassfish.jersey.media-jersey-media-multipart-2.42.jar
- org.glassfish.jersey.inject-jersey-hk2-2.42.jar
* Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
Expand Down
13 changes: 7 additions & 6 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ The Apache Software License, Version 2.0
- avro-1.11.3.jar
- avro-protobuf-1.11.3.jar
* RE2j -- re2j-1.7.jar
* Spotify completable-futures -- completable-futures-0.3.6.jar

BSD 3-clause "New" or "Revised" License
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
Expand Down Expand Up @@ -446,12 +447,12 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
- aopalliance-repackaged-2.6.1.jar
- osgi-resource-locator-1.0.3.jar
* Jersey
- jersey-client-2.41.jar
- jersey-common-2.41.jar
- jersey-entity-filtering-2.41.jar
- jersey-media-json-jackson-2.41.jar
- jersey-media-multipart-2.41.jar
- jersey-hk2-2.41.jar
- jersey-client-2.42.jar
- jersey-common-2.42.jar
- jersey-entity-filtering-2.42.jar
- jersey-media-json-jackson-2.42.jar
- jersey-media-multipart-2.42.jar
- jersey-hk2-2.42.jar
* Mimepull -- mimepull-1.9.15.jar

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
Expand Down
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ flexible messaging model and an intuitive client API.</description>
<netty-iouring.version>0.0.24.Final</netty-iouring.version>
<jetty.version>9.4.54.v20240208</jetty.version>
<conscrypt.version>2.5.2</conscrypt.version>
<jersey.version>2.41</jersey.version>
<jersey.version>2.42</jersey.version>
<athenz.version>1.10.50</athenz.version>
<prometheus.version>0.16.0</prometheus.version>
<vertx.version>4.5.8</vertx.version>
Expand Down Expand Up @@ -266,6 +266,7 @@ flexible messaging model and an intuitive client API.</description>
<opentelemetry.semconv.version>1.25.0-alpha</opentelemetry.semconv.version>
<picocli.version>4.7.5</picocli.version>
<re2j.version>1.7</re2j.version>
<completable-futures.version>0.3.6</completable-futures.version>
<failsafe.version>3.3.2</failsafe.version>

<!-- test dependencies -->
Expand Down Expand Up @@ -665,6 +666,12 @@ flexible messaging model and an intuitive client API.</description>
<version>${re2j.version}</version>
</dependency>

<dependency>
<groupId>com.spotify</groupId>
<artifactId>completable-futures</artifactId>
<version>${completable-futures.version}</version>
</dependency>

<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,30 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map<String, String
* requests
*/
PulsarAdminBuilder acceptGzipCompression(boolean acceptGzipCompression);

/**
* Configures the maximum number of connections that the client library will establish with a single host.
* <p>
* By default, the connection pool maintains up to 16 connections to a single host. This method allows you to
* modify this default behavior and limit the number of connections.
* <p>
* This setting can be useful in scenarios where you want to limit the resources used by the client library,
* or control the level of parallelism for operations so that a single client does not overwhelm
* the Pulsar cluster with too many concurrent connections.
*
* @param maxConnectionsPerHost the maximum number of connections to establish per host. Set to <= 0 to disable
* the limit.
* @return the PulsarAdminBuilder instance, allowing for method chaining
*/
PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost);

/**
* Sets the maximum idle time for a pooled connection. If a connection is idle for more than the specified
* amount of seconds, it will be released back to the connection pool.
* Defaults to 25 seconds.
*
* @param connectionMaxIdleSeconds the maximum idle time, in seconds, for a pooled connection
* @return the PulsarAdminBuilder instance
*/
PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
}
5 changes: 5 additions & 0 deletions pulsar-client-admin-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<include>com.google.guava:guava</include>
<include>com.google.code.gson:gson</include>
<include>com.google.re2j:re2j</include>
<include>com.spotify:completable-futures</include>
<include>com.fasterxml.jackson.*:*</include>
<include>io.netty:*</include>
<include>io.netty.incubator:*</include>
Expand Down Expand Up @@ -192,6 +193,10 @@
<exclude>com.google.protobuf.*</exclude>
</excludes>
</relocation>
<relocation>
<pattern>com.spotify.futures</pattern>
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.asynchttpclient.Dsl.post;
import static org.asynchttpclient.Dsl.put;
import com.google.gson.Gson;
import io.netty.handler.codec.http.HttpHeaders;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Expand All @@ -41,6 +40,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
Expand All @@ -54,10 +54,8 @@
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncCompletionHandlerBase;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.multipart.ByteArrayPart;
import org.asynchttpclient.request.body.multipart.FilePart;
Expand All @@ -70,12 +68,14 @@
public class FunctionsImpl extends ComponentResource implements Functions {

private final WebTarget functions;
private final AsyncHttpClient asyncHttpClient;
private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;

public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) {
public FunctionsImpl(WebTarget web, Authentication auth,
AsyncHttpRequestExecutor asyncHttpRequestExecutor,
long requestTimeoutMs) {
super(auth, requestTimeoutMs);
this.functions = web.path("/admin/v3/functions");
this.asyncHttpClient = asyncHttpClient;
this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
}

@Override
Expand Down Expand Up @@ -171,8 +171,7 @@ public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
Expand Down Expand Up @@ -263,8 +262,7 @@ public CompletableFuture<Void> updateFunctionAsync(
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
Expand Down Expand Up @@ -464,7 +462,7 @@ public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String pat
.addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
.addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture()
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
Expand Down Expand Up @@ -543,55 +541,31 @@ private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTar

RequestBuilder builder = get(target.getUri().toASCIIString());

CompletableFuture<HttpResponseStatus> statusFuture =
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(),
new AsyncHandler<HttpResponseStatus>() {
private HttpResponseStatus status;

@Override
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
status = responseStatus;
if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
return State.ABORT;
}
return State.CONTINUE;
}

@Override
public State onHeadersReceived(HttpHeaders headers) throws Exception {
return State.CONTINUE;
}
CompletableFuture<org.asynchttpclient.Response> responseFuture =
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build(),
() -> new AsyncCompletionHandlerBase() {

@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
os.write(bodyPart.getBodyByteBuffer());
return State.CONTINUE;
}
});

@Override
public HttpResponseStatus onCompleted() throws Exception {
return status;
}

@Override
public void onThrowable(Throwable t) {
}
}).toCompletableFuture();

statusFuture
.whenComplete((status, throwable) -> {
responseFuture
.whenComplete((response, throwable) -> {
try {
os.close();
} catch (IOException e) {
future.completeExceptionally(getApiException(e));
}
})
.thenAccept(status -> {
if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
getApiException(Response
.status(status.getStatusCode())
.entity(status.getStatusText())
.status(response.getStatusCode())
.entity(response.getStatusText())
.build()));
} else {
future.complete(null);
Expand Down Expand Up @@ -700,7 +674,7 @@ public CompletableFuture<Void> putFunctionStateAsync(
.path("state").path(state.getKey()).getUri().toASCIIString());
builder.addBodyPart(new StringPart("state", objectWriter()
.writeValueAsString(state), MediaType.APPLICATION_JSON));
asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
Expand Down Expand Up @@ -740,7 +714,7 @@ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String n
.addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData))
.addBodyPart(new StringPart("delete", Boolean.toString(delete)));

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build())
.toCompletableFuture()
.thenAccept(response -> {
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
Expand Down
Loading
Loading