Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevents memory accumulation from informers #5627

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Fix #5580: [java-generator] Correctly handle defaults for IntOrString types
* Fix #5584: Fix CRD generation when EnumMap is used
* Fix #5626: Prevent memory accumulation from informer usage

#### Improvements
* Fix #5429: moved crd generator annotations to generator-annotations instead of crd-generator-api. Using generator-annotations introduces no transitive dependencies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.SSLParameters;

Expand All @@ -48,7 +49,7 @@ public JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) {
@Override
public HttpClient build() {
if (client != null) {
return new JdkHttpClientImpl(this, client.getHttpClient());
return new JdkHttpClientImpl(this, client.getHttpClient(), client.getClosed());
}
java.net.http.HttpClient.Builder builder = clientFactory.createNewHttpClientBuilder();
if (connectTimeout != null && !java.time.Duration.ZERO.equals(connectTimeout)) {
Expand Down Expand Up @@ -78,7 +79,7 @@ public HttpClient build() {
Arrays.asList(tlsVersions).stream().map(TlsVersion::javaName).toArray(String[]::new)));
}
clientFactory.additionalConfig(builder);
return new JdkHttpClientImpl(this, builder.build());
return new JdkHttpClientImpl(this, builder.build(), new AtomicBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static io.fabric8.kubernetes.client.http.StandardHttpHeaders.CONTENT_TYPE;
Expand Down Expand Up @@ -223,13 +224,13 @@ public Optional<HttpResponse<?>> previousResponse() {

private java.net.http.HttpClient httpClient;

public JdkHttpClientImpl(JdkHttpClientBuilderImpl builder, HttpClient httpClient) {
super(builder);
public JdkHttpClientImpl(JdkHttpClientBuilderImpl builder, HttpClient httpClient, AtomicBoolean closed) {
super(builder, closed);
this.httpClient = httpClient;
}

@Override
public void close() {
public void doClose() {
if (this.httpClient == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.fabric8.kubernetes.client.http.BufferUtil.copy;
import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM;
Expand All @@ -61,13 +62,18 @@ public class JettyHttpClient extends StandardHttpClient<JettyHttpClient, JettyHt

public JettyHttpClient(StandardHttpClientBuilder<JettyHttpClient, JettyHttpClientFactory, JettyHttpClientBuilder> builder,
HttpClient jetty, WebSocketClient jettyWs) {
super(builder);
this(builder, jetty, jettyWs, new AtomicBoolean());
}

public JettyHttpClient(StandardHttpClientBuilder<JettyHttpClient, JettyHttpClientFactory, JettyHttpClientBuilder> builder,
HttpClient jetty, WebSocketClient jettyWs, AtomicBoolean closed) {
super(builder, closed);
this.jetty = jetty;
this.jettyWs = jettyWs;
}

@Override
public void close() {
public void doClose() {
try {
jetty.stop();
jettyWs.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public JettyHttpClientBuilder(JettyHttpClientFactory clientFactory) {
@Override
public JettyHttpClient build() {
if (client != null) {
return new JettyHttpClient(this, client.getJetty(), client.getJettyWs());
return new JettyHttpClient(this, client.getJetty(), client.getJettyWs(), client.getClosed());
}
final var sslContextFactory = new SslContextFactory.Client();
if (sslContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.Proxy;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.X509TrustManager;

Expand Down Expand Up @@ -119,7 +120,7 @@ private OkHttpClientImpl completeBuild(okhttp3.OkHttpClient.Builder builder, boo

OkHttpClient client = builder.build();

return new OkHttpClientImpl(client, this);
return new OkHttpClientImpl(client, this, new AtomicBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class OkHttpClientImpl extends StandardHttpClient<OkHttpClientImpl, OkHttpClientFactory, OkHttpClientBuilderImpl> {
Expand Down Expand Up @@ -241,13 +242,13 @@ public Map<String, List<String>> headers() {

private final okhttp3.OkHttpClient httpClient;

public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder) {
super(builder);
public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder, AtomicBoolean closed) {
super(builder, closed);
this.httpClient = client;
}

@Override
public void close() {
public void doClose() {
ConnectionPool connectionPool = httpClient.connectionPool();

Dispatcher dispatcher = httpClient.dispatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.fabric8.kubernetes.client.vertx.VertxHttpRequest.toHeadersMap;

Expand All @@ -43,8 +44,8 @@ public class VertxHttpClient<F extends io.fabric8.kubernetes.client.http.HttpCli
private final Vertx vertx;
private final HttpClient client;

VertxHttpClient(VertxHttpClientBuilder<F> vertxHttpClientBuilder, HttpClient client) {
super(vertxHttpClientBuilder);
VertxHttpClient(VertxHttpClientBuilder<F> vertxHttpClientBuilder, HttpClient client, AtomicBoolean closed) {
super(vertxHttpClientBuilder, closed);
this.vertx = vertxHttpClientBuilder.vertx;
this.client = client;
}
Expand Down Expand Up @@ -119,7 +120,7 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytesDirect(StandardHtt
}

@Override
public void close() {
public void doClose() {
client.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

public class VertxHttpClientBuilder<F extends HttpClient.Factory>
Expand All @@ -52,7 +53,7 @@ public VertxHttpClientBuilder(F clientFactory, Vertx vertx) {
@Override
public VertxHttpClient<F> build() {
if (this.client != null) {
return new VertxHttpClient<>(this, this.client.getClient());
return new VertxHttpClient<>(this, this.client.getClient(), this.client.getClosed());
}

WebClientOptions options = new WebClientOptions();
Expand Down Expand Up @@ -115,7 +116,7 @@ public SslContextFactory sslContextFactory() {
}
});
}
return new VertxHttpClient<>(this, vertx.createHttpClient(options));
return new VertxHttpClient<>(this, vertx.createHttpClient(options), new AtomicBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,6 @@ interface Builder extends DerivedClientBuilder {

HttpRequest.Builder newHttpRequestBuilder();

boolean isClosed();

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -52,9 +53,11 @@ public abstract class StandardHttpClient<C extends HttpClient, F extends HttpCli
private static final Logger LOG = LoggerFactory.getLogger(StandardHttpClient.class);

protected StandardHttpClientBuilder<C, F, T> builder;
protected AtomicBoolean closed;

protected StandardHttpClient(StandardHttpClientBuilder<C, F, T> builder) {
protected StandardHttpClient(StandardHttpClientBuilder<C, F, T> builder, AtomicBoolean closed) {
this.builder = builder;
this.closed = closed;
}

public abstract CompletableFuture<WebSocketResponse> buildWebSocketDirect(
Expand Down Expand Up @@ -280,4 +283,22 @@ public <V> V getTag(Class<V> type) {
return type.cast(builder.tags.get(type));
}

@Override
final public void close() {
if (closed.compareAndSet(false, true)) {
doClose();
}
}

protected abstract void doClose();

@Override
public boolean isClosed() {
return closed.get();
}

public AtomicBoolean getClosed() {
return closed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,18 @@ public static Stream<Arguments> testRetryAfterParsingData() {
10000));
}

@Test
void testIsClosed() {
client.close();
assertTrue(client.isClosed());
}

@Test
void testDerivedIsClosed() {
TestStandardHttpClient childClient = client.newBuilder().connectTimeout(0, TimeUnit.SECONDS).build();
childClient.close();
assertTrue(childClient.isClosed());
assertTrue(client.isClosed());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class TestStandardHttpClient
extends StandardHttpClient<TestStandardHttpClient, TestStandardHttpClientFactory, TestStandardHttpClientBuilder> {
Expand All @@ -39,15 +40,15 @@ public class TestStandardHttpClient
@Getter
private final List<RecordedConsumeBytesDirect> recordedConsumeBytesDirects;

protected TestStandardHttpClient(TestStandardHttpClientBuilder builder) {
super(builder);
protected TestStandardHttpClient(TestStandardHttpClientBuilder builder, AtomicBoolean closed) {
super(builder, closed);
expectations = new HashMap<>();
recordedBuildWebSocketDirects = new ArrayList<>();
recordedConsumeBytesDirects = new ArrayList<>();
}

@Override
public void close() {
public void doClose() {
recordedConsumeBytesDirects.clear();
recordedBuildWebSocketDirects.clear();
expectations.values().forEach(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.fabric8.kubernetes.client.http;

import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class TestStandardHttpClientBuilder
extends StandardHttpClientBuilder<TestStandardHttpClient, TestStandardHttpClientFactory, TestStandardHttpClientBuilder> {
Expand All @@ -30,7 +32,8 @@ protected TestStandardHttpClientBuilder(TestStandardHttpClientFactory clientFact

@Override
public TestStandardHttpClient build() {
final TestStandardHttpClient instance = new TestStandardHttpClient(this);
final TestStandardHttpClient instance = new TestStandardHttpClient(this,
Optional.ofNullable(instances.peek()).map(TestStandardHttpClient::getClosed).orElse(new AtomicBoolean()));
instances.add(instance);
return instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ void scheduleReconnect(WatchRequestState state) {

synchronized void reconnect() {
try {
if (client.isClosed()) {
logger.debug("The client has closed, closing the watch");
this.close();
return;
}
startWatch();
if (isForceClosed()) {
closeRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,13 @@ private DefaultSharedIndexInformer<T, L> createInformer(long resync, Executor ex
if (indexers != null) {
informer.addIndexers(indexers);
}
this.context.getClient().adapt(BaseClient.class).getClosed().whenComplete((closed, ignored) -> informer.stop());
informer.started().whenComplete((ignored, throwable) -> {
if (throwable == null) {
BaseClient baseClient = this.context.getClient().adapt(BaseClient.class);
baseClient.addToCloseable(informer);
informer.stopped().whenComplete((x, y) -> baseClient.removeFromCloseable(informer));
}
});
return informer;
}

Expand Down
Loading
Loading