diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index ff73a6d4fce..1b2f0ea938b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import okhttp3.OkHttpClient; +import okhttp3.Request; import okhttp3.WebSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +38,7 @@ public abstract class AbstractWatchManager implements Watch { private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class); final Watcher watcher; - final ListOptions listOptions; final AtomicReference resourceVersion; - final OkHttpClient clonedClient; final AtomicBoolean forceClosed; private final int reconnectLimit; @@ -47,18 +46,18 @@ public abstract class AbstractWatchManager implements Watch { private final int maxIntervalExponent; final AtomicInteger currentReconnectAttempt; private final ScheduledExecutorService executorService; + + private final RequestBuilder requestBuilder; + protected ClientRunner runner; AbstractWatchManager( - Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, - OkHttpClient clonedClient + Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder ) { this.watcher = watcher; - this.listOptions = listOptions; this.reconnectLimit = reconnectLimit; this.reconnectInterval = reconnectInterval; this.maxIntervalExponent = maxIntervalExponent; - this.clonedClient = clonedClient; this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); @@ -67,6 +66,15 @@ public abstract class AbstractWatchManager implements Watch { ret.setDaemon(true); return ret; }); + + this.requestBuilder = requestBuilder; + } + + protected void initRunner(ClientRunner runner) { + if (this.runner != null) { + throw new IllegalStateException("ClientRunner has already been initialized"); + } + this.runner = runner; } final void closeEvent(WatcherException cause) { @@ -124,6 +132,37 @@ final long nextReconnectInterval() { logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo); return ret; } + + void resetReconnectAttempts() { + currentReconnectAttempt.set(0); + } + + boolean isForceClosed() { + return forceClosed.get(); + } + + void eventReceived(Watcher.Action action, T resource) { + watcher.eventReceived(action, resource); + } + + void onClose(WatcherException cause) { + watcher.onClose(cause); + } + + void updateResourceVersion(final String newResourceVersion) { + resourceVersion.set(newResourceVersion); + } + + protected void runWatch() { + final Request request = requestBuilder.build(resourceVersion.get()); + logger.debug("Watching {}...", request.url()); + + runner.run(request); + } + + public void waitUntilReady() { + runner.waitUntilReady(); + } static void closeWebSocket(WebSocket webSocket) { if (webSocket != null) { @@ -137,4 +176,33 @@ static void closeWebSocket(WebSocket webSocket) { } } } + + @Override + public void close() { + logger.debug("Force closing the watch {}", this); + closeEvent(); + runner.close(); + closeExecutorService(); + } + + @FunctionalInterface + interface RequestBuilder { + Request build(final String resourceVersion); + } + + abstract static class ClientRunner { + private final OkHttpClient client; + + protected ClientRunner(OkHttpClient client) { + this.client = cloneAndCustomize(client); + } + + abstract void run(Request request); + void close() {} + void waitUntilReady() {} + abstract OkHttpClient cloneAndCustomize(OkHttpClient client); + OkHttpClient client() { + return client; + } + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java new file mode 100644 index 00000000000..5a8936bbf07 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import java.net.MalformedURLException; +import java.net.URL; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.api.model.ListOptions; +import io.fabric8.kubernetes.client.dsl.base.BaseOperation; +import io.fabric8.kubernetes.client.utils.HttpClientUtils; +import io.fabric8.kubernetes.client.utils.Utils; +import okhttp3.HttpUrl; +import okhttp3.Request; + +class BaseOperationRequestBuilder> implements AbstractWatchManager.RequestBuilder { + private final URL requestUrl; + private final BaseOperation baseOperation; + private final ListOptions listOptions; + + public BaseOperationRequestBuilder(BaseOperation baseOperation, ListOptions listOptions) throws MalformedURLException { + this.baseOperation = baseOperation; + this.requestUrl = baseOperation.getNamespacedUrl(); + this.listOptions = listOptions; + } + + @Override + public Request build(final String resourceVersion) { + HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); + + String labelQueryParam = baseOperation.getLabelQueryParam(); + if (Utils.isNotNullOrEmpty(labelQueryParam)) { + httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam); + } + + String fieldQueryString = baseOperation.getFieldQueryParam(); + String name = baseOperation.getName(); + + if (name != null && name.length() > 0) { + if (fieldQueryString.length() > 0) { + fieldQueryString += ","; + } + fieldQueryString += "metadata.name=" + name; + } + if (Utils.isNotNullOrEmpty(fieldQueryString)) { + httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); + } + + listOptions.setResourceVersion(resourceVersion); + HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); + + String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); + if (requestUrl.getPort() != -1) { + origin += ":" + requestUrl.getPort(); + } + + return new Request.Builder() + .get() + .url(httpUrlBuilder.build()) + .addHeader("Origin", origin) + .build(); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java new file mode 100644 index 00000000000..ba9b92fc7d2 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawRequestBuilder.java @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import okhttp3.HttpUrl; +import okhttp3.Request; + +class RawRequestBuilder implements AbstractWatchManager.RequestBuilder { + private final HttpUrl.Builder watchUrlBuilder; + + public RawRequestBuilder(HttpUrl.Builder watchUrlBuilder) { + this.watchUrlBuilder = watchUrlBuilder; + } + + @Override + public Request build(String resourceVersion) { + if (resourceVersion != null) { + watchUrlBuilder.removeAllQueryParameters("resourceVersion"); + watchUrlBuilder.addQueryParameter("resourceVersion", resourceVersion); + } + HttpUrl watchUrl = watchUrlBuilder.build(); + String origin = watchUrl.url().getProtocol() + "://" + watchUrl.url().getHost(); + if (watchUrl.url().getPort() != -1) { + origin += ":" + watchUrl.url().getPort(); + } + + return new Request.Builder() + .get() + .url(watchUrl) + .addHeader("Origin", origin) + .build(); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java index 0780810b62b..7f4a19e62ca 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java @@ -17,227 +17,63 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.ListOptions; -import io.fabric8.kubernetes.api.model.Status; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; -import io.fabric8.kubernetes.client.dsl.base.OperationSupport; -import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; import okhttp3.WebSocket; -import okhttp3.WebSocketListener; -import okio.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference; -import static java.net.HttpURLConnection.HTTP_OK; - /** * This class just replicates WatchConnectionManager in handling watch connections but * instead of using a solid type for deserializing events, it uses plain strings. * */ public class RawWatchConnectionManager extends AbstractWatchManager { - private static final Logger logger = LoggerFactory.getLogger(RawWatchConnectionManager.class); - private ObjectMapper objectMapper; - private HttpUrl.Builder watchUrlBuilder; - - private final AtomicReference webSocketRef = new AtomicReference<>(); - /** True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); - /** Blocking queue for startup exceptions. */ - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); - public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watchUrlBuilder, ListOptions listOptions, ObjectMapper objectMapper, final Watcher watcher, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { - super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - okHttpClient.newBuilder().build() - ); - this.watchUrlBuilder = watchUrlBuilder; - this.objectMapper = objectMapper; - - runWatch(); - } - - private void runWatch() { - if (resourceVersion.get() != null) { - watchUrlBuilder.removeAllQueryParameters("resourceVersion"); - watchUrlBuilder.addQueryParameter("resourceVersion", resourceVersion.get()); - } - HttpUrl watchUrl = watchUrlBuilder.build(); - String origin = watchUrl.url().getProtocol() + "://" + watchUrl.url().getHost(); - if (watchUrl.url().getPort() != -1) { - origin += ":" + watchUrl.url().getPort(); - } - - Request request = new Request.Builder() - .get() - .url(watchUrl) - .addHeader("Origin", origin) - .build(); - clonedClient.newWebSocket(request, new WebSocketListener() { + super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new RawRequestBuilder(watchUrlBuilder)); + + initRunner(new WebSocketClientRunner(okHttpClient) { @Override - public void onOpen(WebSocket webSocket, Response response) { - logger.info("Websocket opened"); - webSocketRef.set(webSocket); - currentReconnectAttempt.set(0); - started.set(true); - queue.clear(); - queue.add(true); + WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef) { + return new RawWatcherWebSocketListener(RawWatchConnectionManager.this, queue, webSocketRef, objectMapper); } - + @Override - public void onMessage(WebSocket webSocket, String text) { - try { - Map watchEvent = objectMapper.readValue(text, HashMap.class); - - String watchEventType = watchEvent.get("type").toString(); - String watchObjectAsString = objectMapper.writeValueAsString(watchEvent.get("object")); - - watcher.eventReceived(Watcher.Action.valueOf(watchEventType), watchObjectAsString); - - } catch (IOException exception) { - logger.error("Failed to deserialize watch response: " + exception.getMessage()); - } - } - - @Override - public void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()); - } - - @Override - public void onClosing(WebSocket webSocket, int code, String reason) { - logger.info("Socket closing: " + reason); - webSocket.close(code, reason); - } - - @Override - public void onClosed(WebSocket webSocket, int code, String reason) { - logger.debug("WebSocket close received. code: {}, reason: {}", code, reason); - if (forceClosed.get()) { - logger.debug("Ignoring onClose for already closed/closing websocket"); - return; - } - if (cannotReconnect()) { - closeEvent(new WatcherException("Connection unexpectedly closed")); - return; - } - scheduleReconnect(); - } - - @Override - public void onFailure(WebSocket webSocket, Throwable t, Response response) { - if (forceClosed.get()) { - logger.debug("Ignoring onFailure for already closed/closing websocket", t); - // avoid resource leak though - if (response != null && response.body() != null) { - response.body().close(); - } - return; - } - - // We do not expect a 200 in response to the websocket connection. If it occurs, we throw - // an exception and try the watch via a persistent HTTP Get. - if (response != null && response.code() == HTTP_OK) { - queue.clear(); - queue.offer(new KubernetesClientException("Received 200 on websocket", - response.code(), null)); - response.body().close(); - return; - } - - if (response != null) { - // We only need to queue startup failures. - Status status = OperationSupport.createStatus(response); - if (response.body() != null) { - response.body().close(); - } - logger.warn("Exec Failure: HTTP {}, Status: {} - {}", response.code(), status.getCode(), status.getMessage(), - t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException(status)); - } - } else { - logger.warn("Exec Failure", t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException("Failed to start websocket", t)); - } - } - - if (cannotReconnect()) { - closeEvent(new WatcherException("Connection failure", t)); - return; - } - - scheduleReconnect(); + OkHttpClient cloneAndCustomize(OkHttpClient client) { + return okHttpClient.newBuilder().build(); } }); + runWatch(); } - - private void scheduleReconnect() { - logger.debug("Submitting reconnect task to the executor"); - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - submit(new NamedRunnable("scheduleReconnect") { - @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - schedule(new NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); - close(); - } - } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); - } + + private static class RawWatcherWebSocketListener extends WatcherWebSocketListener { + private final ObjectMapper objectMapper; + + public RawWatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef, ObjectMapper objectMapper) { + super(manager, queue, webSocketRef); + this.objectMapper = objectMapper; + } + + @Override + public void onMessage(WebSocket webSocket, String text) { + try { + Map watchEvent = objectMapper.readValue(text, HashMap.class); + + String watchEventType = watchEvent.get("type").toString(); + String watchObjectAsString = objectMapper.writeValueAsString(watchEvent.get("object")); + + manager.eventReceived(Watcher.Action.valueOf(watchEventType), watchObjectAsString); + + } catch (IOException exception) { + logger.error("Failed to deserialize watch response: " + exception.getMessage()); } - }); - } - - public void waitUntilReady() { - Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); - } - - @Override - public void close() { - logger.debug("Force closing the watch {}", this); - closeEvent(); - closeWebSocket(webSocketRef.getAndSet(null)); - closeExecutorService(); + } } - } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index eb172cd4607..97a450116cc 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -15,6 +15,12 @@ */ package io.fabric8.kubernetes.client.dsl.internal; +import java.net.MalformedURLException; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; @@ -25,291 +31,92 @@ import io.fabric8.kubernetes.client.Watcher.Action; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; -import io.fabric8.kubernetes.client.dsl.base.OperationSupport; -import io.fabric8.kubernetes.client.utils.HttpClientUtils; -import io.fabric8.kubernetes.client.utils.Utils; -import okhttp3.HttpUrl; import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; import okhttp3.WebSocket; -import okhttp3.WebSocketListener; -import okio.ByteString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.MalformedURLException; -import java.net.URL; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import static io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager.readWatchEvent; import static java.net.HttpURLConnection.HTTP_GONE; -import static java.net.HttpURLConnection.HTTP_OK; public class WatchConnectionManager> extends AbstractWatchManager { - - private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); - - private final BaseOperation baseOperation; - private final AtomicReference webSocketRef = new AtomicReference<>(); - /** True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); - /** Blocking queue for startup exceptions. */ - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); - private final URL requestUrl; - + public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - client.newBuilder() - .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) - .build() + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder<>(baseOperation, listOptions) ); - this.baseOperation = baseOperation; - - // The URL is created, validated and saved once, so that reconnect attempts don't have to deal with - // MalformedURLExceptions that would never occur - - requestUrl = baseOperation.getNamespacedUrl(); + + initRunner(new WebSocketClientRunner(client) { + @Override + WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef) { + return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this, queue, webSocketRef); + } + + @Override + OkHttpClient cloneAndCustomize(OkHttpClient client) { + return client.newBuilder() + .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) + .build(); + } + }); runWatch(); } - + public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout) throws MalformedURLException { // Default max 32x slowdown from base interval this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, 5); } - - private void runWatch() { - logger.debug("Connecting websocket to {}...", requestUrl); - - HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); - - String labelQueryParam = baseOperation.getLabelQueryParam(); - if (Utils.isNotNullOrEmpty(labelQueryParam)) { - httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam); - } - - String fieldQueryString = baseOperation.getFieldQueryParam(); - String name = baseOperation.getName(); - - if (name != null && name.length() > 0) { - if (fieldQueryString.length() > 0) { - fieldQueryString += ","; - } - fieldQueryString += "metadata.name=" + name; + + private static class TypedWatcherWebSocketListener extends WatcherWebSocketListener { + public TypedWatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { + super(manager, queue, webSocketRef); } - if (Utils.isNotNullOrEmpty(fieldQueryString)) { - httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); - } - - listOptions.setResourceVersion(resourceVersion.get()); - HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); - - String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); - if (requestUrl.getPort() != -1) { - origin += ":" + requestUrl.getPort(); - } - - Request request = new Request.Builder() - .get() - .url(httpUrlBuilder.build()) - .addHeader("Origin", origin) - .build(); - - clonedClient.newWebSocket(request, new WebSocketListener() { - @Override - public void onOpen(final WebSocket webSocket, Response response) { - if (response != null && response.body() != null) { - response.body().close(); - } - logger.debug("WebSocket successfully opened"); - webSocketRef.set(webSocket); - currentReconnectAttempt.set(0); - started.set(true); - queue.clear(); - queue.add(true); - } - - @Override - public void onFailure(WebSocket webSocket, Throwable t, Response response) { - if (forceClosed.get()) { - logger.debug("Ignoring onFailure for already closed/closing websocket", t); - // avoid resource leak though - if (response != null && response.body() != null) { - response.body().close(); - } - return; - } - - // We do not expect a 200 in response to the websocket connection. If it occurs, we throw - // an exception and try the watch via a persistent HTTP Get. - // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported - if (response != null && (response.code() == HTTP_OK || response.code() == 503)) { - queue.clear(); - queue.offer(new KubernetesClientException("Received " + response.code() + " on websocket", - response.code(), null)); - response.body().close(); - return; - } - - if (response != null) { - // We only need to queue startup failures. - Status status = OperationSupport.createStatus(response); - if (response.body() != null) { - response.body().close(); + + @Override + public void onMessage(WebSocket webSocket, String message) { + try { + WatchEvent event = readWatchEvent(message); + Object object = event.getObject(); + if (object instanceof HasMetadata) { + @SuppressWarnings("unchecked") + T obj = (T) object; + manager.updateResourceVersion(obj.getMetadata().getResourceVersion()); + Action action = Action.valueOf(event.getType()); + manager.eventReceived(action, obj); + } else if (object instanceof KubernetesResourceList) { + // Dirty cast - should always be valid though + KubernetesResourceList list = (KubernetesResourceList) object; + manager.updateResourceVersion(list.getMetadata().getResourceVersion()); + Action action = Action.valueOf(event.getType()); + List items = list.getItems(); + if (items != null) { + for (HasMetadata item : items) { + manager.eventReceived(action, (T) item); + } } - logger.warn("Exec Failure: HTTP {}, Status: {} - {}", response.code(), status.getCode(), status.getMessage(), - t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException(status)); + } else if (object instanceof Status) { + Status status = (Status) object; + + // The resource version no longer exists - this has to be handled by the caller. + if (status.getCode() == HTTP_GONE) { + webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe + // exception + // shut down executor, etc. + manager.closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status))); + manager.close(); + return; } + + manager.eventReceived(Action.ERROR, null); + logger.error("Error received: {}", status); } else { - logger.warn("Exec Failure", t); - if (!started.get()) { - queue.clear(); - queue.offer(new KubernetesClientException("Failed to start websocket", t)); - } - } - - if (cannotReconnect()) { - closeEvent(new WatcherException("Connection failure", t)); - return; - } - - scheduleReconnect(); - } - - @Override - public void onMessage(WebSocket webSocket, ByteString bytes) { - onMessage(webSocket, bytes.utf8()); - } - - @Override - public void onMessage(WebSocket webSocket, String message) { - try { - WatchEvent event = readWatchEvent(message); - Object object = event.getObject(); - if (object instanceof HasMetadata) { - @SuppressWarnings("unchecked") - T obj = (T) object; - resourceVersion.set(obj.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - watcher.eventReceived(action, obj); - } else if (object instanceof KubernetesResourceList) { - KubernetesResourceList list = (KubernetesResourceList) object; - // Dirty cast - should always be valid though - resourceVersion.set(list.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - List items = list.getItems(); - if (items != null) { - for (HasMetadata item : items) { - watcher.eventReceived(action, (T) item); - } - } - } else if (object instanceof Status) { - Status status = (Status) object; - - // The resource version no longer exists - this has to be handled by the caller. - if (status.getCode() == HTTP_GONE) { - webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe - // exception - // shut down executor, etc. - closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status))); - close(); - return; - } - - watcher.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status); - } else { - logger.error("Unknown message received: {}", message); - } - } catch (ClassCastException e) { - logger.error("Received wrong type of object for watch", e); - } catch (IllegalArgumentException e) { - logger.error("Invalid event type", e); - } catch (Throwable e) { - logger.error("Unhandled exception encountered in watcher event handler", e); - } - } - - @Override - public void onClosing(WebSocket webSocket, int code, String reason) { - webSocket.close(code, reason); - } - - @Override - public void onClosed(WebSocket webSocket, int code, String reason) { - logger.debug("WebSocket close received. code: {}, reason: {}", code, reason); - if (forceClosed.get()) { - logger.debug("Ignoring onClose for already closed/closing websocket"); - return; - } - if (cannotReconnect()) { - closeEvent(new WatcherException("Connection unexpectedly closed")); - return; + logger.error("Unknown message received: {}", message); } - scheduleReconnect(); + } catch (ClassCastException e) { + logger.error("Received wrong type of object for watch", e); + } catch (IllegalArgumentException e) { + logger.error("Invalid event type", e); + } catch (Throwable e) { + logger.error("Unhandled exception encountered in watcher event handler", e); } - }); - } - - private void scheduleReconnect() { - - logger.debug("Submitting reconnect task to the executor"); - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - submit(new NamedRunnable("scheduleReconnect") { - @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - schedule(new NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); - close(); - } - } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); - } - } - }); - } - - public void waitUntilReady() { - Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); - } - - @Override - public void close() { - logger.debug("Force closing the watch {}", this); - closeEvent(); - closeWebSocket(webSocketRef.getAndSet(null)); - closeExecutorService(); + } } - } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 1a7acb9be38..0ab76a81497 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -13,22 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.fabric8.kubernetes.client.dsl.internal; -import static java.net.HttpURLConnection.HTTP_GONE; - import java.io.IOException; import java.net.MalformedURLException; -import java.net.URL; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResource; import io.fabric8.kubernetes.api.model.KubernetesResourceList; @@ -41,27 +34,24 @@ import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.dsl.base.OperationSupport; -import io.fabric8.kubernetes.client.utils.HttpClientUtils; import io.fabric8.kubernetes.client.utils.Serialization; -import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.Call; import okhttp3.Callback; -import okhttp3.HttpUrl; import okhttp3.Interceptor; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.logging.HttpLoggingInterceptor; import okio.BufferedSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.net.HttpURLConnection.HTTP_GONE; public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); - - private final BaseOperation baseOperation; - - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); - private final URL requestUrl; - + + public WatchHTTPManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, @@ -70,213 +60,190 @@ public WatchHTTPManager(final OkHttpClient client, // Default max 32x slowdown from base interval this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, connectTimeout, 5); } - + public WatchHTTPManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long connectTimeout, int maxIntervalExponent) throws MalformedURLException { - + super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - client.newBuilder() - .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) - .readTimeout(0, TimeUnit.MILLISECONDS) - .cache(null) - .build() + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder<>(baseOperation, listOptions) ); - this.baseOperation = baseOperation; - - - // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does - // not let us stream responses from the server. - for (Interceptor i : clonedClient.networkInterceptors()) { - if (i instanceof HttpLoggingInterceptor) { - HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i; - interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC); - } - } - - requestUrl = baseOperation.getNamespacedUrl(); - runWatch(); - } - - private void runWatch() { - HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); - String labelQueryParam = baseOperation.getLabelQueryParam(); - if (Utils.isNotNullOrEmpty(labelQueryParam)) { - httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam); - } - - String fieldQueryString = baseOperation.getFieldQueryParam(); - String name = baseOperation.getName(); - - if (name != null && name.length() > 0) { - if (fieldQueryString.length() > 0) { - fieldQueryString += ","; - } - fieldQueryString += "metadata.name=" + name; - } - - if (Utils.isNotNullOrEmpty(fieldQueryString)) { - httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); - } - - listOptions.setResourceVersion(resourceVersion.get()); - HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); - String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); - if (requestUrl.getPort() != -1) { - origin += ":" + requestUrl.getPort(); - } - - HttpUrl url = httpUrlBuilder.build(); - - logger.debug("Watching via HTTP GET {}", url); - - final Request request = new Request.Builder() - .get() - .url(url) - .addHeader("Origin", origin) - .build(); - - clonedClient.newCall(request).enqueue(new Callback() { - @Override - public void onFailure(Call call, IOException e) { - logger.info("Watch connection failed. reason: {}", e.getMessage()); - scheduleReconnect(true); - } - + + initRunner(new HTTPClientRunner(client, this) { @Override - public void onResponse(Call call, Response response) throws IOException { - if (!response.isSuccessful()) { - onStatus(OperationSupport.createStatus(response.code(), response.message())); - } - - boolean shouldBackoff = true; - - try { - BufferedSource source = response.body().source(); - while (!source.exhausted()) { - String message = source.readUtf8LineStrict(); - onMessage(message); + OkHttpClient cloneAndCustomize(OkHttpClient client) { + final OkHttpClient clonedClient = client.newBuilder() + .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) + .readTimeout(0, TimeUnit.MILLISECONDS) + .cache(null) + .build(); + // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does + // not let us stream responses from the server. + for (Interceptor i : clonedClient.networkInterceptors()) { + if (i instanceof HttpLoggingInterceptor) { + HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i; + interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC); } - // the normal operation of a long poll get is to return once a response is available. - // in that case we should reconnect immediately. - shouldBackoff = false; - } catch (Exception e) { - logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage()); - } - - // if we get here, the source is exhausted, so, we have lost our "watch". - // we must reconnect. - if (response != null) { - response.body().close(); } - - scheduleReconnect(shouldBackoff); + return clonedClient; } }); + + runWatch(); } - - private void scheduleReconnect(boolean shouldBackoff) { - if (forceClosed.get()) { - logger.warn("Ignoring error for already closed/closing connection"); - return; + + private abstract static class HTTPClientRunner extends AbstractWatchManager.ClientRunner { + private final AbstractWatchManager manager; + private final AtomicBoolean reconnectPending = new AtomicBoolean(false); + + public HTTPClientRunner(OkHttpClient client, AbstractWatchManager manager) { + super(client); + this.manager = manager; } - - if (cannotReconnect()) { - watcher.onClose(new WatcherException("Connection unexpectedly closed")); - return; + + @Override + void run(Request request) { + client().newCall(request).enqueue(new Callback() { + @Override + public void onFailure(Call call, IOException e) { + logger.info("Watch connection failed. reason: {}", e.getMessage()); + scheduleReconnect(true); + } + + @Override + public void onResponse(Call call, Response response) throws IOException { + if (!response.isSuccessful()) { + onStatus(OperationSupport.createStatus(response.code(), response.message())); + } + + boolean shouldBackoff = true; + + try { + BufferedSource source = response.body().source(); + while (!source.exhausted()) { + String message = source.readUtf8LineStrict(); + onMessage(message); + } + // the normal operation of a long poll get is to return once a response is available. + // in that case we should reconnect immediately. + shouldBackoff = false; + } catch (Exception e) { + logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage()); + } + + // if we get here, the source is exhausted, so, we have lost our "watch". + // we must reconnect. + if (response != null) { + response.body().close(); + } + + scheduleReconnect(shouldBackoff); + } + }); } - - logger.debug("Submitting reconnect task to the executor"); - - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor. - submit(() -> { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); + + private void scheduleReconnect(boolean shouldBackoff) { + if (manager.isForceClosed()) { + logger.warn("Ignoring error for already closed/closing connection"); return; } - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - - long delay = shouldBackoff - ? nextReconnectInterval() - : 0; - - schedule(() -> { - try { - WatchHTTPManager.this.runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. + + if (manager.cannotReconnect()) { + manager.onClose(new WatcherException("Connection unexpectedly closed")); + return; + } + + logger.debug("Submitting reconnect task to the executor"); + + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor. + manager.submit(() -> { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + + long delay = shouldBackoff + ? manager.nextReconnectInterval() + : 0; + + manager.schedule(() -> { + try { + manager.runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + close(); + manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); + } + }, delay, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + // This is a standard exception if we close the scheduler. We should not print it + if (!manager.isForceClosed()) { logger.error("Exception in reconnect", e); - close(); - watcher.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); } - }, delay, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - // This is a standard exception if we close the scheduler. We should not print it - if (!forceClosed.get()) { - logger.error("Exception in reconnect", e); + reconnectPending.set(false); } - reconnectPending.set(false); - } - }); - } - - public void onMessage(String messageSource) { - try { - WatchEvent event = readWatchEvent(messageSource); - KubernetesResource object = event.getObject(); - if (object instanceof HasMetadata) { - @SuppressWarnings("unchecked") - T obj = (T) object; - // Dirty cast - should always be valid though - resourceVersion.set(obj.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - watcher.eventReceived(action, obj); - } else if (object instanceof KubernetesResourceList) { - KubernetesResourceList list = (KubernetesResourceList) object; - // Dirty cast - should always be valid though - resourceVersion.set(list.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - List items = list.getItems(); - if (items != null) { - for (HasMetadata item : items) { - watcher.eventReceived(action, (T) item); + }); + } + + public void onMessage(String messageSource) { + try { + WatchEvent event = readWatchEvent(messageSource); + KubernetesResource object = event.getObject(); + if (object instanceof HasMetadata) { + // Dirty cast - should always be valid though + @SuppressWarnings("unchecked") + T obj = (T) object; + manager.updateResourceVersion(obj.getMetadata().getResourceVersion()); + Watcher.Action action = Watcher.Action.valueOf(event.getType()); + manager.eventReceived(action, obj); + } else if (object instanceof KubernetesResourceList) { + KubernetesResourceList list = (KubernetesResourceList) object; + // Dirty cast - should always be valid though + manager.updateResourceVersion(list.getMetadata().getResourceVersion()); + Watcher.Action action = Watcher.Action.valueOf(event.getType()); + List items = list.getItems(); + if (items != null) { + for (HasMetadata item : items) { + manager.eventReceived(action, (T) item); + } } + } else if (object instanceof Status) { + onStatus((Status) object); + } else { + logger.error("Unknown message received: {}", messageSource); } - } else if (object instanceof Status) { - onStatus((Status) object); - } else { - logger.error("Unknown message received: {}", messageSource); + } catch (ClassCastException e) { + logger.error("Received wrong type of object for watch", e); + } catch (IllegalArgumentException e) { + logger.error("Invalid event type", e); } - } catch (ClassCastException e) { - logger.error("Received wrong type of object for watch", e); - } catch (IllegalArgumentException e) { - logger.error("Invalid event type", e); } - } - - private void onStatus(Status status) { - // The resource version no longer exists - this has to be handled by the caller. - if (status.getCode() == HTTP_GONE) { - // exception - // shut down executor, etc. - close(); - watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); - return; + + private void onStatus(Status status) { + // The resource version no longer exists - this has to be handled by the caller. + if (status.getCode() == HTTP_GONE) { + // exception + // shut down executor, etc. + close(); + manager.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); + return; + } + + manager.eventReceived(Action.ERROR, null); + logger.error("Error received: {}", status.toString()); } - - watcher.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status.toString()); } - - + + protected static WatchEvent readWatchEvent(String messageSource) { WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class); KubernetesResource object = null; @@ -300,11 +267,4 @@ protected static WatchEvent readWatchEvent(String messageSource) { } return event; } - - @Override - public void close() { - logger.debug("Force closing the watch {}", this); - closeEvent(); - closeExecutorService(); - } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java new file mode 100644 index 00000000000..b42dccab410 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import io.fabric8.kubernetes.api.model.Status; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.base.OperationSupport; +import okhttp3.Response; +import okhttp3.WebSocket; +import okhttp3.WebSocketListener; +import okio.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; + +abstract class WatcherWebSocketListener extends WebSocketListener { + protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class); + + protected final AtomicReference webSocketRef; + /** + * True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. + */ + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean reconnectPending = new AtomicBoolean(false); + /** + * Blocking queue for startup exceptions. + */ + private final BlockingQueue queue; + protected final AbstractWatchManager manager; + + protected WatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { + this.manager = manager; + this.queue = queue; + this.webSocketRef = webSocketRef; + } + + @Override + public void onOpen(final WebSocket webSocket, Response response) { + if (response != null && response.body() != null) { + response.body().close(); + } + logger.debug("WebSocket successfully opened"); + webSocketRef.set(webSocket); + manager.resetReconnectAttempts(); + started.set(true); + queue.clear(); + queue.add(true); + } + + @Override + public void onFailure(WebSocket webSocket, Throwable t, Response response) { + if (manager.isForceClosed()) { + logger.debug("Ignoring onFailure for already closed/closing websocket", t); + // avoid resource leak though + if (response != null && response.body() != null) { + response.body().close(); + } + return; + } + + if (response != null) { + final int code = response.code(); + // We do not expect a 200 in response to the websocket connection. If it occurs, we throw + // an exception and try the watch via a persistent HTTP Get. + // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported + if (HTTP_OK == code || HTTP_UNAVAILABLE == code) { + pushException(new KubernetesClientException("Received " + code + " on websocket", code, null)); + closeBody(response); + return; + } else { + // We only need to queue startup failures. + Status status = OperationSupport.createStatus(response); + closeBody(response); + logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage(), t); + if (!started.get()) { + pushException(new KubernetesClientException(status)); + } + } + } else { + logger.warn("Exec Failure", t); + if (!started.get()) { + pushException(new KubernetesClientException("Failed to start websocket", t)); + } + } + + if (manager.cannotReconnect()) { + manager.closeEvent(new WatcherException("Connection failure", t)); + return; + } + + scheduleReconnect(); + } + + private void pushException(KubernetesClientException exception) { + queue.clear(); + if (!queue.offer(exception)) { + logger.debug("Couldn't add exception {} to queue", exception.getLocalizedMessage()); + } + } + + private void closeBody(Response response) { + if (response.body() != null) { + response.body().close(); + } + } + + @Override + public void onMessage(WebSocket webSocket, ByteString bytes) { + onMessage(webSocket, bytes.utf8()); + } + + @Override + public void onClosing(WebSocket webSocket, int code, String reason) { + logger.debug("Socket closing: {}", reason); + webSocket.close(code, reason); + } + + @Override + public void onClosed(WebSocket webSocket, int code, String reason) { + logger.debug("WebSocket close received. code: {}, reason: {}", code, reason); + if (manager.isForceClosed()) { + logger.debug("Ignoring onClose for already closed/closing websocket"); + return; + } + if (manager.cannotReconnect()) { + manager.closeEvent(new WatcherException("Connection unexpectedly closed")); + return; + } + scheduleReconnect(); + } + + private void scheduleReconnect() { + logger.debug("Submitting reconnect task to the executor"); + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor + manager.submit(new NamedRunnable("scheduleReconnect") { + @Override + public void execute() { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + webSocketRef.set(null); + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + manager.schedule(new NamedRunnable("reconnectAttempt") { + @Override + public void execute() { + try { + manager.runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + webSocketRef.set(null); + manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); + manager.close(); + } + } + }, manager.nextReconnectInterval(), TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + reconnectPending.set(false); + } + } + }); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java new file mode 100644 index 00000000000..228ef6b4f7e --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import io.fabric8.kubernetes.client.utils.Utils; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.WebSocket; + +abstract class WebSocketClientRunner extends AbstractWatchManager.ClientRunner { + private final AtomicReference webSocketRef = new AtomicReference<>(); + private final BlockingQueue queue = new ArrayBlockingQueue<>(1); + + protected WebSocketClientRunner(OkHttpClient client) { + super(client); + } + + @Override + public void run(Request request) { + client().newWebSocket(request, newListener(queue, webSocketRef)); + } + + abstract WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef); + + @Override + public void close() { + AbstractWatchManager.closeWebSocket(webSocketRef.getAndSet(null)); + } + + @Override + public void waitUntilReady() { + Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); + } +} diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index 023d8d5ff09..a1a482b2ab8 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import okhttp3.OkHttpClient; +import okhttp3.Request; import okhttp3.WebSocket; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -195,9 +196,8 @@ private static class WatcherAdapter implements Watcher { private final AtomicInteger closeCount = new AtomicInteger(0); @Override - public void eventReceived(Action action, T resource) { - - } + public void eventReceived(Action action, T resource) {} + @Override public void onClose(WatcherException cause) { closeCount.addAndGet(1); @@ -212,7 +212,16 @@ public void onClose() { private static final class WatchManager extends AbstractWatchManager { public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, OkHttpClient clonedClient) { - super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, clonedClient); + super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null); + initRunner(new ClientRunner(clonedClient) { + @Override + void run(Request request) {} + + @Override + OkHttpClient cloneAndCustomize(OkHttpClient client) { + return clonedClient; + } + }); } @Override public void close() {