diff --git a/CHANGELOG.md b/CHANGELOG.md index 22d85be080b..6fafb93a7cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Bugs #### Improvements +* Fix #2614: Watcher.onClose has dedicated WatcherException as parameter. #### Dependency Upgrade diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java index 88337a8dcc5..27f88f3bfac 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java @@ -15,16 +15,25 @@ */ package io.fabric8.kubernetes.client; +import org.slf4j.LoggerFactory; + public interface Watcher { void eventReceived(Action action, T resource); /** - * Run when the watcher finally closes. + * Invoked when the watcher is gracefully closed. + */ + default void onClose() { + LoggerFactory.getLogger(Watcher.class).debug("Watcher closed"); + } + + /** + * Invoked when the watcher closes due to an Exception. * - * @param cause What caused the watcher to be closed. Null means normal close. + * @param cause What caused the watcher to be closed. */ - void onClose(KubernetesClientException cause); + void onClose(WatcherException cause); enum Action { ADDED, MODIFIED, DELETED, ERROR diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/WatcherException.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/WatcherException.java new file mode 100644 index 00000000000..c5c61e9398d --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/WatcherException.java @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client; + +import java.net.HttpURLConnection; + +public class WatcherException extends Exception { + + public WatcherException(String message, Throwable cause) { + super(message, cause); + } + + public WatcherException(String message) { + super(message); + } + + public KubernetesClientException asClientException() { + final Throwable cause = getCause(); + return cause instanceof KubernetesClientException ? + (KubernetesClientException) cause : new KubernetesClientException(getMessage(), cause); + } + + public boolean isHttpGone() { + final KubernetesClientException cause = asClientException(); + return cause.getCode() == HttpURLConnection.HTTP_GONE + || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE); + } + + public boolean isShouldRetry() { + return getCause() == null || !isHttpGone(); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java index b66b85d8885..6ffbc36c987 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java @@ -16,6 +16,7 @@ package io.fabric8.kubernetes.client.dsl.base; import io.fabric8.kubernetes.api.model.ObjectReference; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +46,6 @@ import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Replaceable; import io.fabric8.kubernetes.client.dsl.Resource; -import io.fabric8.kubernetes.client.dsl.base.WaitForConditionWatcher.WatchException; import io.fabric8.kubernetes.client.dsl.internal.DefaultOperationInfo; import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager; import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager; @@ -1104,7 +1104,7 @@ private T waitUntilConditionWithRetries(Predicate condition, long timeoutNano return watcher.getFuture().get(remainingNanosToWait, NANOSECONDS); } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause instanceof WatchException && ((WatchException) cause).isShouldRetry()) { + if (cause instanceof WatcherException && ((WatcherException) cause).isShouldRetry()) { LOG.debug("retryable watch exception encountered, retrying after {} millis", currentBackOff, cause); Thread.sleep(currentBackOff); currentBackOff *= watchRetryBackoffMultiplier; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java index 52dcb1a9f15..a0ea87880c1 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java @@ -15,13 +15,12 @@ */ package io.fabric8.kubernetes.client.dsl.base; -import java.net.HttpURLConnection; import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; public class WaitForConditionWatcher implements Watcher { @@ -52,34 +51,18 @@ public void eventReceived(Action action, T resource) { } break; case ERROR: - future.completeExceptionally(new WatchException("Action.ERROR received")); + future.completeExceptionally(new WatcherException("Action.ERROR received")); break; } } @Override - public void onClose(KubernetesClientException cause) { - future.completeExceptionally(new WatchException("Watcher closed", cause)); + public void onClose(WatcherException cause) { + future.completeExceptionally(cause); } - public static class WatchException extends Exception { - - public WatchException(String message, KubernetesClientException cause) { - super(message, cause); - } - - public WatchException(String message) { - super(message); - } - - public boolean isShouldRetry() { - return getCause() == null || !isHttpGone(); - } - - private boolean isHttpGone() { - KubernetesClientException cause = ((KubernetesClientException) getCause()); - return cause.getCode() == HttpURLConnection.HTTP_GONE - || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE); - } + @Override + public void onClose() { + future.completeExceptionally(new WatcherException("Watcher closed")); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java new file mode 100644 index 00000000000..ff73a6d4fce --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import io.fabric8.kubernetes.api.model.ListOptions; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; +import okhttp3.OkHttpClient; +import okhttp3.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractWatchManager implements Watch { + + private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class); + + final Watcher watcher; + final ListOptions listOptions; + final AtomicReference resourceVersion; + final OkHttpClient clonedClient; + + final AtomicBoolean forceClosed; + private final int reconnectLimit; + private final int reconnectInterval; + private final int maxIntervalExponent; + final AtomicInteger currentReconnectAttempt; + private final ScheduledExecutorService executorService; + + + AbstractWatchManager( + Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, + OkHttpClient clonedClient + ) { + this.watcher = watcher; + this.listOptions = listOptions; + this.reconnectLimit = reconnectLimit; + this.reconnectInterval = reconnectInterval; + this.maxIntervalExponent = maxIntervalExponent; + this.clonedClient = clonedClient; + this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); + this.currentReconnectAttempt = new AtomicInteger(0); + this.forceClosed = new AtomicBoolean(); + this.executorService = Executors.newSingleThreadScheduledExecutor(r -> { + Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(AbstractWatchManager.this)); + ret.setDaemon(true); + return ret; + }); + } + + final void closeEvent(WatcherException cause) { + if (forceClosed.getAndSet(true)) { + logger.debug("Ignoring duplicate firing of onClose event"); + return; + } + watcher.onClose(cause); + } + + final void closeEvent() { + if (forceClosed.getAndSet(true)) { + logger.debug("Ignoring duplicate firing of onClose event"); + return; + } + watcher.onClose(); + } + + final void closeExecutorService() { + if (executorService != null && !executorService.isShutdown()) { + logger.debug("Closing ExecutorService"); + try { + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { + logger.warn("Executor didn't terminate in time after shutdown in close(), killing it."); + executorService.shutdownNow(); + } + } catch (Exception t) { + throw KubernetesClientException.launderThrowable(t); + } + } + } + + void submit(Runnable task) { + if (!executorService.isShutdown()) { + executorService.submit(task); + } + } + + void schedule(Runnable command, long delay, TimeUnit timeUnit) { + if (!executorService.isShutdown()) { + executorService.schedule(command, delay, timeUnit); + } + } + + final boolean cannotReconnect() { + return currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0; + } + + final long nextReconnectInterval() { + int exponentOfTwo = currentReconnectAttempt.getAndIncrement(); + if (exponentOfTwo > maxIntervalExponent) + exponentOfTwo = maxIntervalExponent; + long ret = (long)reconnectInterval * (1 << exponentOfTwo); + logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo); + return ret; + } + + static void closeWebSocket(WebSocket webSocket) { + if (webSocket != null) { + logger.debug("Closing websocket {}", webSocket); + try { + if (!webSocket.close(1000, null)) { + logger.warn("Failed to close websocket"); + } + } catch (IllegalStateException e) { + logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage()); + } + } + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java new file mode 100644 index 00000000000..3928cde14d1 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import java.util.Objects; + +abstract class NamedRunnable implements Runnable { + private final String name; + + NamedRunnable(String name) { + this.name = Objects.requireNonNull(name); + } + + private void tryToSetName(String value) { + try { + Thread.currentThread().setName(value); + } catch (SecurityException ignored) { + // Ignored + } + } + + public final void run() { + String oldName = Thread.currentThread().getName(); + tryToSetName(this.name + "|" + oldName); + try { + execute(); + } finally { + tryToSetName(oldName); + } + } + + protected abstract void execute(); +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java index 4f4573bd46e..0780810b62b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java @@ -19,8 +19,8 @@ import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.OperationSupport; import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.HttpUrl; @@ -36,14 +36,10 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static java.net.HttpURLConnection.HTTP_OK; @@ -53,44 +49,25 @@ * instead of using a solid type for deserializing events, it uses plain strings. * */ -public class RawWatchConnectionManager implements Watch { +public class RawWatchConnectionManager extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(RawWatchConnectionManager.class); private ObjectMapper objectMapper; - private Watcher watcher; private HttpUrl.Builder watchUrlBuilder; - private final AtomicBoolean forceClosed = new AtomicBoolean(); - private final AtomicReference resourceVersion; - private final int reconnectLimit; - private final int reconnectInterval; - private int maxIntervalExponent; - private final AtomicInteger currentReconnectAttempt = new AtomicInteger(0); private final AtomicReference webSocketRef = new AtomicReference<>(); - // single threaded serial executor - private final ScheduledExecutorService executor; /** True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean reconnectPending = new AtomicBoolean(false); /** Blocking queue for startup exceptions. */ private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); - private WebSocket webSocket; - private OkHttpClient clonedClient; - public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watchUrlBuilder, ListOptions listOptions, ObjectMapper objectMapper, final Watcher watcher, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { - this.clonedClient = okHttpClient; + super( + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, + okHttpClient.newBuilder().build() + ); this.watchUrlBuilder = watchUrlBuilder; this.objectMapper = objectMapper; - this.watcher = watcher; - this.reconnectLimit = reconnectLimit; - this.reconnectInterval = reconnectInterval; - this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); - this.maxIntervalExponent = maxIntervalExponent; - executor = Executors.newSingleThreadScheduledExecutor(r -> { - Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(RawWatchConnectionManager.this)); - ret.setDaemon(true); - return ret; - }); runWatch(); } @@ -111,7 +88,7 @@ private void runWatch() { .url(watchUrl) .addHeader("Origin", origin) .build(); - webSocket = clonedClient.newWebSocket(request, new WebSocketListener() { + clonedClient.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, Response response) { logger.info("Websocket opened"); @@ -155,8 +132,8 @@ public void onClosed(WebSocket webSocket, int code, String reason) { logger.debug("Ignoring onClose for already closed/closing websocket"); return; } - if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) { - closeEvent(new KubernetesClientException("Connection unexpectedly closed")); + if (cannotReconnect()) { + closeEvent(new WatcherException("Connection unexpectedly closed")); return; } scheduleReconnect(); @@ -203,8 +180,8 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { } } - if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) { - closeEvent(new KubernetesClientException("Connection failure", t)); + if (cannotReconnect()) { + closeEvent(new WatcherException("Connection failure", t)); return; } @@ -214,45 +191,41 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { } private void scheduleReconnect() { - logger.debug("Submitting reconnect task to the executor"); - // Don't submit new tasks after having called shutdown() on executor - if(!executor.isShutdown()) { - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - executor.submit(new RawWatchConnectionManager.NamedRunnable("scheduleReconnect") { - @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - executor.schedule(new RawWatchConnectionManager.NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e)); - close(); - } + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor + submit(new NamedRunnable("scheduleReconnect") { + @Override + public void execute() { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + webSocketRef.set(null); + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + schedule(new NamedRunnable("reconnectAttempt") { + @Override + public void execute() { + try { + runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + webSocketRef.set(null); + closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); + close(); } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); - } + } + }, nextReconnectInterval(), TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + reconnectPending.set(false); } - }); - } + } + }); } public void waitUntilReady() { @@ -262,75 +235,9 @@ public void waitUntilReady() { @Override public void close() { logger.debug("Force closing the watch {}", this); - closeEvent(null); + closeEvent(); closeWebSocket(webSocketRef.getAndSet(null)); - if (!executor.isShutdown()) { - try { - executor.shutdown(); - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - logger.warn("Executor didn't terminate in time after shutdown in close(), killing it in: {}", this); - executor.shutdownNow(); - } - } catch (Throwable t) { - throw KubernetesClientException.launderThrowable(t); - } - } - } - - private void closeEvent(KubernetesClientException cause) { - if (forceClosed.getAndSet(true)) { - logger.debug("Ignoring duplicate firing of onClose event"); - return; - } - watcher.onClose(cause); + closeExecutorService(); } - private void closeWebSocket(WebSocket ws) { - if (ws != null) { - logger.debug("Closing websocket {}", ws); - try { - if (!ws.close(1000, null)) { - logger.warn("Failed to close websocket"); - } - } catch (IllegalStateException e) { - logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage()); - } - } - } - - private long nextReconnectInterval() { - int exponentOfTwo = currentReconnectAttempt.getAndIncrement(); - if (exponentOfTwo > maxIntervalExponent) - exponentOfTwo = maxIntervalExponent; - long ret = reconnectInterval * (1 << exponentOfTwo); - logger.debug("Current reconnect backoff is " + ret + " milliseconds (T" + exponentOfTwo + ")"); - return ret; - } - - private static abstract class NamedRunnable implements Runnable { - private final String name; - - public NamedRunnable(String name) { - this.name = Objects.requireNonNull(name); - } - - private void tryToSetName(String value) { - try { - Thread.currentThread().setName(value); - } catch (SecurityException ignored) { - } - } - - public final void run() { - String oldName = Thread.currentThread().getName(); - tryToSetName(this.name + "|" + oldName); - try { - execute(); - } finally { - tryToSetName(oldName); - } - } - - protected abstract void execute(); - } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index c71c8cdab72..2062aef63ec 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -21,49 +21,42 @@ import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.WatchEvent; import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.Watcher.Action; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.dsl.base.OperationSupport; import io.fabric8.kubernetes.client.utils.HttpClientUtils; import io.fabric8.kubernetes.client.utils.Utils; -import okhttp3.*; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.WebSocket; +import okhttp3.WebSocketListener; import okio.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.List; -import java.util.Objects; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager.readWatchEvent; import static java.net.HttpURLConnection.HTTP_GONE; import static java.net.HttpURLConnection.HTTP_OK; -public class WatchConnectionManager> implements Watch { +public class WatchConnectionManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); - private final AtomicBoolean forceClosed = new AtomicBoolean(); - private final AtomicReference resourceVersion; - private final ListOptions listOptions; private final BaseOperation baseOperation; - private final Watcher watcher; - private final int reconnectLimit; - private final int reconnectInterval; - private int maxIntervalExponent; - private final long websocketTimeout; - private final AtomicInteger currentReconnectAttempt = new AtomicInteger(0); private final AtomicReference webSocketRef = new AtomicReference<>(); - // single threaded serial executor - private final ScheduledExecutorService executor; /** True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean reconnectPending = new AtomicBoolean(false); @@ -71,34 +64,19 @@ public class WatchConnectionManager queue = new ArrayBlockingQueue<>(1); private final URL requestUrl; - private WebSocket webSocket; - private OkHttpClient clonedClient; - public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { - this.listOptions = listOptions; - this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); + super( + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, + client.newBuilder() + .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) + .build() + ); this.baseOperation = baseOperation; - this.watcher = watcher; - this.reconnectInterval = reconnectInterval; - this.reconnectLimit = reconnectLimit; - this.websocketTimeout = websocketTimeout; - this.maxIntervalExponent = maxIntervalExponent; - - this.clonedClient = client.newBuilder() - .readTimeout(this.websocketTimeout, TimeUnit.MILLISECONDS) - .build(); // The URL is created, validated and saved once, so that reconnect attempts don't have to deal with // MalformedURLExceptions that would never occur requestUrl = baseOperation.getNamespacedUrl(); - //create after the call above where MalformedURLException can be raised - //avoids having to call shutdown in case the exception is raised - executor = Executors.newSingleThreadScheduledExecutor(r -> { - Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(WatchConnectionManager.this)); - ret.setDaemon(true); - return ret; - }); runWatch(); } @@ -107,8 +85,8 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation= reconnectLimit && reconnectLimit >= 0) { - closeEvent(new KubernetesClientException("Connection failure", t)); + if (cannotReconnect()) { + closeEvent(new WatcherException("Connection failure", t)); return; } @@ -230,13 +208,10 @@ public void onMessage(WebSocket webSocket, String message) { if (object instanceof HasMetadata) { @SuppressWarnings("unchecked") T obj = (T) object; - // Dirty cast - should always be valid though - resourceVersion.set(((HasMetadata) obj).getMetadata().getResourceVersion()); + resourceVersion.set(obj.getMetadata().getResourceVersion()); Watcher.Action action = Watcher.Action.valueOf(event.getType()); watcher.eventReceived(action, obj); } else if (object instanceof KubernetesResourceList) { - @SuppressWarnings("unchecked") - KubernetesResourceList list = (KubernetesResourceList) object; // Dirty cast - should always be valid though resourceVersion.set(list.getMetadata().getResourceVersion()); @@ -255,18 +230,16 @@ public void onMessage(WebSocket webSocket, String message) { webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe // exception // shut down executor, etc. - closeEvent(new KubernetesClientException(status)); + closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status))); close(); return; } watcher.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status.toString()); + logger.error("Error received: {}", status); } else { logger.error("Unknown message received: {}", message); } - } catch (IOException e) { - logger.error("Could not deserialize watch event: {}", message, e); } catch (ClassCastException e) { logger.error("Received wrong type of object for watch", e); } catch (IllegalArgumentException e) { @@ -286,8 +259,8 @@ public void onClosed(WebSocket webSocket, int code, String reason) { logger.debug("Ignoring onClose for already closed/closing websocket"); return; } - if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) { - closeEvent(new KubernetesClientException("Connection unexpectedly closed")); + if (cannotReconnect()) { + closeEvent(new WatcherException("Connection unexpectedly closed")); return; } scheduleReconnect(); @@ -298,43 +271,40 @@ public void onClosed(WebSocket webSocket, int code, String reason) { private void scheduleReconnect() { logger.debug("Submitting reconnect task to the executor"); - // Don't submit new tasks after having called shutdown() on executor - if(!executor.isShutdown()) { - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - executor.submit(new NamedRunnable("scheduleReconnect") { - @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - executor.schedule(new NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e)); - close(); - } + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor + submit(new NamedRunnable("scheduleReconnect") { + @Override + public void execute() { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + webSocketRef.set(null); + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + schedule(new NamedRunnable("reconnectAttempt") { + @Override + public void execute() { + try { + runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + webSocketRef.set(null); + closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); + close(); } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); - } + } + }, nextReconnectInterval(), TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + reconnectPending.set(false); } - }); - } + } + }); } public void waitUntilReady() { @@ -344,75 +314,9 @@ public void waitUntilReady() { @Override public void close() { logger.debug("Force closing the watch {}", this); - closeEvent(null); + closeEvent(); closeWebSocket(webSocketRef.getAndSet(null)); - if (!executor.isShutdown()) { - try { - executor.shutdown(); - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - logger.warn("Executor didn't terminate in time after shutdown in close(), killing it in: {}", this); - executor.shutdownNow(); - } - } catch (Throwable t) { - throw KubernetesClientException.launderThrowable(t); - } - } + closeExecutorService(); } - private void closeEvent(KubernetesClientException cause) { - if (forceClosed.getAndSet(true)) { - logger.debug("Ignoring duplicate firing of onClose event"); - return; - } - watcher.onClose(cause); - } - - private void closeWebSocket(WebSocket ws) { - if (ws != null) { - logger.debug("Closing websocket {}", ws); - try { - if (!ws.close(1000, null)) { - logger.warn("Failed to close websocket"); - } - } catch (IllegalStateException e) { - logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage()); - } - } - } - - private long nextReconnectInterval() { - int exponentOfTwo = currentReconnectAttempt.getAndIncrement(); - if (exponentOfTwo > maxIntervalExponent) - exponentOfTwo = maxIntervalExponent; - long ret = reconnectInterval * (1 << exponentOfTwo); - logger.debug("Current reconnect backoff is " + ret + " milliseconds (T" + exponentOfTwo + ")"); - return ret; - } - - private static abstract class NamedRunnable implements Runnable { - private final String name; - - public NamedRunnable(String name) { - this.name = Objects.requireNonNull(name); - } - - private void tryToSetName(String value) { - try { - Thread.currentThread().setName(value); - } catch (SecurityException ignored) { - } - } - - public final void run() { - String oldName = Thread.currentThread().getName(); - tryToSetName(this.name + "|" + oldName); - try { - execute(); - } finally { - tryToSetName(oldName); - } - } - - protected abstract void execute(); - } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 4459d3128dc..03cda81a0eb 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -23,15 +23,21 @@ import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.WatchEvent; import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.Watcher.Action; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.dsl.base.OperationSupport; import io.fabric8.kubernetes.client.utils.HttpClientUtils; import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.kubernetes.client.utils.Utils; -import okhttp3.*; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.HttpUrl; +import okhttp3.Interceptor; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; import okhttp3.logging.HttpLoggingInterceptor; import okio.BufferedSource; import org.slf4j.Logger; @@ -41,36 +47,19 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import static java.net.HttpURLConnection.HTTP_GONE; -public class WatchHTTPManager> implements - Watch { +public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); private final BaseOperation baseOperation; - private final Watcher watcher; - private final AtomicBoolean forceClosed = new AtomicBoolean(); - private final ListOptions listOptions; - private final AtomicReference resourceVersion; - private final int reconnectLimit; - private final int reconnectInterval; private final AtomicBoolean reconnectPending = new AtomicBoolean(false); - private int maxIntervalExponent; private final URL requestUrl; - private final AtomicInteger currentReconnectAttempt = new AtomicInteger(0); - private OkHttpClient clonedClient; - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> { - Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(WatchHTTPManager.this)); - ret.setDaemon(true); - return ret; - }); public WatchHTTPManager(final OkHttpClient client, final BaseOperation baseOperation, @@ -87,19 +76,16 @@ public WatchHTTPManager(final OkHttpClient client, final int reconnectLimit, long connectTimeout, int maxIntervalExponent) throws MalformedURLException { - this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); - this.listOptions = listOptions; + super( + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, + client.newBuilder() + .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) + .readTimeout(0, TimeUnit.MILLISECONDS) + .cache(null) + .build() + ); this.baseOperation = baseOperation; - this.watcher = watcher; - this.reconnectInterval = reconnectInterval; - this.reconnectLimit = reconnectLimit; - this.maxIntervalExponent = maxIntervalExponent; - OkHttpClient clonedClient = client.newBuilder() - .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) - .readTimeout(0, TimeUnit.MILLISECONDS) - .cache(null) - .build(); // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does // not let us stream responses from the server. @@ -110,12 +96,11 @@ public WatchHTTPManager(final OkHttpClient client, } } - this.clonedClient = clonedClient; requestUrl = baseOperation.getNamespacedUrl(); runWatch(); } - private final void runWatch() { + private void runWatch() { logger.debug("Watching via HTTP GET ... {}", this); HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); @@ -197,48 +182,45 @@ private void scheduleReconnect() { return; } - if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) { - watcher.onClose(new KubernetesClientException("Connection unexpectedly closed")); + if (cannotReconnect()) { + watcher.onClose(new WatcherException("Connection unexpectedly closed")); return; } logger.debug("Submitting reconnect task to the executor"); - // Don't submit new tasks after having called shutdown() on executor - if(!executor.isShutdown()) { - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor. - executor.submit(() -> { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - executor.schedule(() -> { - try { - WatchHTTPManager.this.runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - close(); - watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e)); - } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - // This is a standard exception if we close the scheduler. We should not print it - if (!forceClosed.get()) { + // make sure that whichever thread calls this method, the tasks are + // performed serially in the executor. + submit(() -> { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; + } + try { + // actual reconnect only after the back-off time has passed, without + // blocking the thread + logger.debug("Scheduling reconnect task"); + schedule(() -> { + try { + WatchHTTPManager.this.runWatch(); + reconnectPending.set(false); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. logger.error("Exception in reconnect", e); + close(); + watcher.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); } - reconnectPending.set(false); + }, nextReconnectInterval(), TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + // This is a standard exception if we close the scheduler. We should not print it + if (!forceClosed.get()) { + logger.error("Exception in reconnect", e); } - }); - } + reconnectPending.set(false); + } + }); } - public void onMessage(String messageSource) throws IOException { + public void onMessage(String messageSource) { try { WatchEvent event = readWatchEvent(messageSource); KubernetesResource object = event.getObject(); @@ -246,19 +228,16 @@ public void onMessage(String messageSource) throws IOException { @SuppressWarnings("unchecked") T obj = (T) object; // Dirty cast - should always be valid though - resourceVersion.set(((HasMetadata) obj).getMetadata().getResourceVersion()); + resourceVersion.set(obj.getMetadata().getResourceVersion()); Watcher.Action action = Watcher.Action.valueOf(event.getType()); watcher.eventReceived(action, obj); } else if (object instanceof KubernetesResourceList) { - @SuppressWarnings("unchecked") - KubernetesResourceList list = (KubernetesResourceList) object; // Dirty cast - should always be valid though resourceVersion.set(list.getMetadata().getResourceVersion()); Watcher.Action action = Watcher.Action.valueOf(event.getType()); List items = list.getItems(); if (items != null) { - String name = baseOperation.getName(); for (HasMetadata item : items) { watcher.eventReceived(action, (T) item); } @@ -270,17 +249,15 @@ public void onMessage(String messageSource) throws IOException { // exception // shut down executor, etc. close(); - watcher.onClose(new KubernetesClientException(status)); + watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); return; } watcher.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status.toString()); + logger.error("Error received: {}", status); } else { logger.error("Unknown message received: {}", messageSource); } - } catch (IOException e) { - logger.error("Could not deserialize watch event: {}", messageSource, e); } catch (ClassCastException e) { logger.error("Received wrong type of object for watch", e); } catch (IllegalArgumentException e) { @@ -288,11 +265,11 @@ public void onMessage(String messageSource) throws IOException { } } - protected static WatchEvent readWatchEvent(String messageSource) throws IOException { + protected static WatchEvent readWatchEvent(String messageSource) { WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class); KubernetesResource object = null; if (event != null) { - object = event.getObject();; + object = event.getObject(); } // when watching API Groups we don't get a WatchEvent resource // so the object will be null @@ -312,29 +289,10 @@ protected static WatchEvent readWatchEvent(String messageSource) throws IOExcept return event; } - private long nextReconnectInterval() { - int exponentOfTwo = currentReconnectAttempt.getAndIncrement(); - if (exponentOfTwo > maxIntervalExponent) - exponentOfTwo = maxIntervalExponent; - long ret = reconnectInterval * (1 << exponentOfTwo); - logger.debug("Current reconnect backoff is " + ret + " milliseconds (T" + exponentOfTwo + ")"); - return ret; - } - @Override public void close() { logger.debug("Force closing the watch {}", this); - forceClosed.set(true); - if (!executor.isShutdown()) { - try { - executor.shutdown(); - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - logger.warn("Executor didn't terminate in time after shutdown in close(), killing it in: {}", this); - executor.shutdownNow(); - } - } catch (Throwable t) { - throw KubernetesClientException.launderThrowable(t); - } - } + closeEvent(); + closeExecutorService(); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java index a1c4f700ee8..18ddb482951 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java @@ -16,13 +16,12 @@ package io.fabric8.kubernetes.client.informers.cache; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.HttpURLConnection; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -70,18 +69,21 @@ public void eventReceived(Action action, T resource) { } @Override - public void onClose(KubernetesClientException exception) { + public void onClose(WatcherException exception) { log.error("Watch closing"); Optional.ofNullable(exception) .map(e -> { log.debug("Exception received during watch", e); return exception; }) - .map(KubernetesClientException::getStatus) - .map(Status::getCode) - .filter(c -> c.equals(HttpURLConnection.HTTP_GONE)) + .filter(WatcherException::isHttpGone) .ifPresent(c -> onHttpGone.run()); onClose.run(); } + @Override + public void onClose() { + log.info("Watch gracefully closed"); + onClose.run(); + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/internal/readiness/ReadinessWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/internal/readiness/ReadinessWatcher.java index 608f3f09aac..3c9a599343c 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/internal/readiness/ReadinessWatcher.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/internal/readiness/ReadinessWatcher.java @@ -20,13 +20,12 @@ import java.util.concurrent.atomic.AtomicReference; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; public class ReadinessWatcher implements Watcher { - private final CountDownLatch latch = new CountDownLatch(1); private final AtomicReference reference = new AtomicReference<>(); @@ -49,7 +48,7 @@ public void eventReceived(Action action, T resource) { } @Override - public void onClose(KubernetesClientException e) { + public void onClose(WatcherException e) { } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/WatcherToggle.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/WatcherToggle.java index dc239946a08..45d6851c353 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/WatcherToggle.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/WatcherToggle.java @@ -16,8 +16,8 @@ package io.fabric8.kubernetes.client.utils; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import java.util.Objects; @@ -26,7 +26,7 @@ */ public class WatcherToggle implements Watcher { - private Watcher delegate; + private final Watcher delegate; private boolean enabled; @@ -51,9 +51,16 @@ public void eventReceived(Action action, T resource) { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { if (enabled) { delegate.onClose(cause); } } + + @Override + public void onClose() { + if (enabled) { + delegate.onClose(); + } + } } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/BaseOperationWatchTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/BaseOperationWatchTest.java new file mode 100644 index 00000000000..0ae45acbd27 --- /dev/null +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/BaseOperationWatchTest.java @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.base; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.StatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager; +import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@SuppressWarnings({"rawtypes", "FieldCanBeLocal"}) +class BaseOperationWatchTest { + + private Watcher watcher; + private OperationContext operationContext; + private BaseOperation> baseOperation; + + @SuppressWarnings("unchecked") + @BeforeEach + void setUp() { + watcher = mock(Watcher.class); + operationContext = mock(OperationContext.class, RETURNS_DEEP_STUBS); + baseOperation = new BaseOperation<>(operationContext); + } + + @Test + @DisplayName("watch, with exception on connection open, should throw Exception and close WatchConnectionManager") + void watchWithExceptionOnOpen() { + try (final MockedConstruction m = mockConstruction(WatchConnectionManager.class, (mock, context) -> { + // Given + doThrow(new KubernetesClientException("Mocked Connection Error")).when(mock).waitUntilReady(); + })) { + // When + final KubernetesClientException result = assertThrows(KubernetesClientException.class, + () -> { + baseOperation.watch(watcher); + fail(); + }); + // Then + assertThat(result).hasMessage("Mocked Connection Error"); + assertThat(m.constructed()) + .hasSize(1) + .element(0) + .matches(wcm -> { + verify(wcm, times(1)).close(); + return true; + }); + } + } + + @Test + @DisplayName("watch, with retryable exception on connection open, should close initial WatchConnectionManager and retry") + void watchWithRetryableExceptionOnOpen() { + try ( + final MockedConstruction m = mockConstruction(WatchConnectionManager.class, (mock, context) -> { + // Given + doThrow(new KubernetesClientException(new StatusBuilder().withCode(503).build())).when(mock).waitUntilReady(); + }); + final MockedConstruction mHttp = mockConstruction(WatchHTTPManager.class) + ) { + // When + final Watch result = baseOperation.watch(watcher); + // Then + assertThat(result).isInstanceOf(WatchHTTPManager.class).isSameAs(mHttp.constructed().get(0)); + assertThat(m.constructed()) + .hasSize(1) + .element(0) + .matches(wcm -> { + verify(wcm, times(1)).close(); + return true; + }); + } + } +} diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java index 2ec33deeb52..af0d2afdb8c 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java @@ -26,13 +26,13 @@ import java.util.concurrent.ExecutionException; import java.util.function.Predicate; +import io.fabric8.kubernetes.client.WatcherException; import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher.Action; -import io.fabric8.kubernetes.client.dsl.base.WaitForConditionWatcher.WatchException; class WaitForConditionWatcherTest { @@ -111,8 +111,8 @@ void itCompletesExceptionallyOnError() throws Exception { watcher.getFuture().get(); fail("should have thrown exception"); } catch (ExecutionException e) { - assertEquals(e.getCause().getClass(), WatchException.class); - assertEquals(e.getCause().getMessage(), "Action.ERROR received"); + assertEquals(WatcherException.class, e.getCause().getClass()); + assertEquals("Action.ERROR received", e.getCause().getMessage()); } assertFalse(condition.isCalled()); } @@ -121,15 +121,15 @@ void itCompletesExceptionallyOnError() throws Exception { void itCompletesExceptionallyWithRetryOnCloseNonGone() throws Exception { TrackingPredicate condition = condition(ss -> true); WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.onClose(new KubernetesClientException("test", 500, null)); + watcher.onClose(new WatcherException("Watcher closed", new KubernetesClientException("test", 500, null))); assertTrue(watcher.getFuture().isDone()); try { watcher.getFuture().get(); fail("should have thrown exception"); } catch (ExecutionException e) { - assertEquals(e.getCause().getClass(), WatchException.class); - assertEquals(e.getCause().getMessage(), "Watcher closed"); - assertTrue(((WatchException) e.getCause()).isShouldRetry()); + assertEquals(WatcherException.class, e.getCause().getClass()); + assertEquals("Watcher closed", e.getCause().getMessage()); + assertTrue(((WatcherException) e.getCause()).isShouldRetry()); } assertFalse(condition.isCalled()); } @@ -138,19 +138,35 @@ void itCompletesExceptionallyWithRetryOnCloseNonGone() throws Exception { void itCompletesExceptionallyWithNoRetryOnCloseGone() throws Exception { TrackingPredicate condition = condition(ss -> true); WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); - watcher.onClose(new KubernetesClientException("test", HttpURLConnection.HTTP_GONE, null)); + watcher.onClose(new WatcherException("Watcher closed", new KubernetesClientException("test", HttpURLConnection.HTTP_GONE, null))); assertTrue(watcher.getFuture().isDone()); try { watcher.getFuture().get(); fail("should have thrown exception"); } catch (ExecutionException e) { - assertEquals(e.getCause().getClass(), WatchException.class); - assertEquals(e.getCause().getMessage(), "Watcher closed"); - assertFalse(((WatchException) e.getCause()).isShouldRetry()); + assertEquals(WatcherException.class, e.getCause().getClass()); + assertEquals("Watcher closed", e.getCause().getMessage()); + assertFalse(((WatcherException) e.getCause()).isShouldRetry()); } assertFalse(condition.isCalled()); } + @Test + void itCompletesExceptionallyWithRetryOnGracefulClose() throws Exception { + TrackingPredicate condition = condition(ss -> true); + WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); + watcher.onClose(); + assertTrue(watcher.getFuture().isDone()); + try { + watcher.getFuture().get(); + fail("should have thrown exception"); + } catch (ExecutionException e) { + assertEquals(WatcherException.class, e.getCause().getClass()); + assertEquals("Watcher closed", e.getCause().getMessage()); + assertTrue(((WatcherException) e.getCause()).isShouldRetry()); + } + assertFalse(condition.isCalled()); + } private TrackingPredicate condition(Predicate condition) { return new TrackingPredicate(condition); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java new file mode 100644 index 00000000000..023d8d5ff09 --- /dev/null +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -0,0 +1,221 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl.internal; + +import io.fabric8.kubernetes.api.model.ListOptions; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; +import okhttp3.OkHttpClient; +import okhttp3.WebSocket; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class AbstractWatchManagerTest { + + private MockedStatic executors; + private ScheduledExecutorService executorService; + + @BeforeEach + void setUp() { + executorService = mock(ScheduledExecutorService.class, RETURNS_DEEP_STUBS); + executors = mockStatic(Executors.class); + executors.when(() -> Executors.newSingleThreadScheduledExecutor(any())).thenReturn(executorService); + } + + @AfterEach + void tearDown() { + executors.close(); + } + + @Test + @DisplayName("closeEvent, is idempotent, multiple calls only close watcher once") + void closeEventIsIdempotent() { + // Given + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + // When + for (int it = 0; it < 10; it++) { + awm.closeEvent(); + } + // Then + assertThat(watcher.closeCount.get()).isEqualTo(1); + } + + @Test + @DisplayName("closeEvent with Exception, is idempotent, multiple calls only close watcher once") + void closeEventWithExceptionIsIdempotent() { + // Given + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + // When + for (int it = 0; it < 10; it++) { + awm.closeEvent(new WatcherException("Mock")); + } + // Then + assertThat(watcher.closeCount.get()).isEqualTo(1); + } + + @Test + @DisplayName("closeWebSocket, closes web socket with 1000 code (Normal Closure)") + void closeWebSocket() { + // Given + final WebSocket webSocket = mock(WebSocket.class); + // When + AbstractWatchManager.closeWebSocket(webSocket); + // Then + verify(webSocket, times(1)).close(1000, null); + } + + @Test + @DisplayName("closeExecutorService, with graceful termination") + void closeExecutorServiceGracefully() throws InterruptedException{ + // Given + final WatchManager awm = withDefaultWatchManager(null); + when(executorService.awaitTermination(1, TimeUnit.SECONDS)).thenReturn(true); + // When + awm.closeExecutorService(); + // Then + verify(executorService, times(1)).shutdown(); + verify(executorService, times(0)).shutdownNow(); + } + + @Test + @DisplayName("closeExecutorService, with shutdownNow") + void closeExecutorServiceNow() throws InterruptedException { + // Given + final WatchManager awm = withDefaultWatchManager(null); + when(executorService.awaitTermination(1, TimeUnit.SECONDS)).thenReturn(false); + // When + awm.closeExecutorService(); + // Then + verify(executorService, times(1)).shutdown(); + verify(executorService, times(1)).shutdownNow(); + } + + @Test + @DisplayName("submit, executor not shutdown, should submit") + void submitWhenIsNotShutdown() { + // Given + final WatchManager awm = withDefaultWatchManager(null); + // When + awm.submit(() -> {}); + // Then + verify(executorService, times(1)).submit(any(Runnable.class)); + } + + @Test + @DisplayName("submit, executor shutdown, should NOT submit") + void submitWhenIsShutdown() { + // Given + final WatchManager awm = withDefaultWatchManager(null); + when(executorService.isShutdown()).thenReturn(true); + // When + awm.submit(() -> {}); + // Then + verify(executorService, times(0)).submit(any(Runnable.class)); + } + + @Test + @DisplayName("schedule, executor not shutdown, should submit") + void scheduleWhenIsNotShutdown() { + // Given + final WatchManager awm = withDefaultWatchManager(null); + // When + awm.schedule(() -> {}, 0, TimeUnit.SECONDS); + // Then + verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + @DisplayName("schedule, executor shutdown, should NOT submit") + void scheduleWhenIsShutdown() { + // Given + final WatchManager awm = withDefaultWatchManager(null); + when(executorService.isShutdown()).thenReturn(true); + // When + awm.schedule(() -> {}, 0, TimeUnit.SECONDS); + // Then + verify(executorService, times(0)).schedule(any(Runnable.class), anyLong(), any()); + } + + @Test + @DisplayName("nextReconnectInterval, returns exponential interval values up to the provided limit") + void nextReconnectInterval() { + // Given + final WatchManager awm = new WatchManager<>( + null, mock(ListOptions.class), 0, 10, 5, null); + // When-Then + assertThat(awm.nextReconnectInterval()).isEqualTo(10); + assertThat(awm.nextReconnectInterval()).isEqualTo(20); + assertThat(awm.nextReconnectInterval()).isEqualTo(40); + assertThat(awm.nextReconnectInterval()).isEqualTo(80); + assertThat(awm.nextReconnectInterval()).isEqualTo(160); + assertThat(awm.nextReconnectInterval()).isEqualTo(320); + assertThat(awm.nextReconnectInterval()).isEqualTo(320); + } + + private static WatchManager withDefaultWatchManager(Watcher watcher) { + return new WatchManager<>( + watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0, + mock(OkHttpClient.class)); + } + + private static class WatcherAdapter implements Watcher { + private final AtomicInteger closeCount = new AtomicInteger(0); + + @Override + public void eventReceived(Action action, T resource) { + + } + @Override + public void onClose(WatcherException cause) { + closeCount.addAndGet(1); + } + + @Override + public void onClose() { + closeCount.addAndGet(1); + } + } + + private static final class WatchManager extends AbstractWatchManager { + + public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, OkHttpClient clonedClient) { + super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, clonedClient); + } + @Override + public void close() { + } + } +} diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/CRDExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/CRDExample.java index 8a8000b5bde..9f3dbd9ccc2 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/CRDExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/CRDExample.java @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; @@ -171,12 +172,12 @@ public void eventReceived(Action action, Dummy resource) { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { } }); System.in.read(); - + } catch (KubernetesClientException e) { logger.error(e.getMessage(), e); } catch (Exception e) { diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/CronJobExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/CronJobExample.java index a30be95bc35..a1835ce0565 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/CronJobExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/CronJobExample.java @@ -33,7 +33,7 @@ public class CronJobExample { private static final Logger logger = LoggerFactory.getLogger(CronJobExample.class); - public static void main(String args[]) throws InterruptedException { + public static void main(String[] args) { String master = "https://localhost:8443/"; if (args.length == 1) { master = args[0]; @@ -86,7 +86,7 @@ public void eventReceived(Action action, Pod aPod) { } @Override - public void onClose(KubernetesClientException e) { + public void onClose(WatcherException e) { // Ignore } })) { diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/FullExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/FullExample.java index 614fea88379..2f2c02f96e7 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/FullExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/FullExample.java @@ -22,22 +22,19 @@ import io.fabric8.kubernetes.api.model.ReplicationControllerBuilder; import io.fabric8.kubernetes.api.model.ResourceQuota; import io.fabric8.kubernetes.api.model.ResourceQuotaBuilder; -import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.ServiceBuilder; import io.fabric8.kubernetes.client.APIGroupNotAvailableException; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.internal.SerializationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.fabric8.kubernetes.client.Watcher.Action.ERROR; - public class FullExample { private static final Logger logger = LoggerFactory.getLogger(FullExample.class); @@ -57,7 +54,7 @@ public void eventReceived(Action action, ReplicationController resource) { } @Override - public void onClose(KubernetesClientException e) { + public void onClose(WatcherException e) { if (e != null) { e.printStackTrace(); logger.error(e.getMessage(), e); diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/RawCustomResourceExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/RawCustomResourceExample.java index 17afc13a13b..943b1bb27ab 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/RawCustomResourceExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/RawCustomResourceExample.java @@ -20,6 +20,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public void eventReceived(Action action, String resource) { } @Override - public void onClose(KubernetesClientException e) { + public void onClose(WatcherException e) { logger.debug("Watcher onClose"); closeLatch.countDown(); if (e != null) { diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/WatchExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/WatchExample.java index e7f1fdd9517..112a2e4c1a9 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/WatchExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/WatchExample.java @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,7 @@ public void eventReceived(Action action, ReplicationController resource) { } @Override - public void onClose(KubernetesClientException e) { + public void onClose(WatcherException e) { logger.debug("Watcher onClose"); if (e != null) { logger.error(e.getMessage(), e); diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/kubectl/equivalents/PodWatchEquivalent.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/kubectl/equivalents/PodWatchEquivalent.java index 643f032502f..463904b5b11 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/kubectl/equivalents/PodWatchEquivalent.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/kubectl/equivalents/PodWatchEquivalent.java @@ -18,8 +18,8 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,14 +42,14 @@ public static void main(String[] args) { @Override public void eventReceived(Action action, Pod pod) { logger.info("{} {}", action.name(), pod.getMetadata().getName()); - switch (action.name()) { - case "ADDED": + switch (action) { + case ADDED: logger.info("{} got added", pod.getMetadata().getName()); break; - case "DELETED": + case DELETED: logger.info("{} got deleted", pod.getMetadata().getName()); break; - case "MODIFIED": + case MODIFIED: logger.info("{} got modified", pod.getMetadata().getName()); break; default: @@ -58,7 +58,7 @@ public void eventReceived(Action action, Pod pod) { } @Override - public void onClose(KubernetesClientException e) { + public void onClose(WatcherException e) { logger.info( "Closed"); isWatchClosed.countDown(); } diff --git a/kubernetes-examples/src/main/java/io/fabric8/openshift/examples/WatchBuildConfigs.java b/kubernetes-examples/src/main/java/io/fabric8/openshift/examples/WatchBuildConfigs.java index cccfd1bdf5e..9afbf762c2c 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/openshift/examples/WatchBuildConfigs.java +++ b/kubernetes-examples/src/main/java/io/fabric8/openshift/examples/WatchBuildConfigs.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.openshift.api.model.BuildConfig; import io.fabric8.openshift.client.DefaultOpenShiftClient; import io.fabric8.openshift.client.OpenShiftClient; @@ -36,7 +37,7 @@ public void eventReceived(Action action, BuildConfig resource) { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { System.out.println("Watch Closed: " + cause); if (cause != null) { cause.printStackTrace(); diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java index af63eb35f40..e2aa245c381 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java @@ -71,7 +71,12 @@ public void eventReceived(Action action, Pod pod) { } @Override - public void onClose(KubernetesClientException e) { + public void onClose(WatcherException cause) { + + } + + @Override + public void onClose() { closeLatch.countDown(); logger.info("watch closed..."); } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/CustomResourceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/CustomResourceTest.java index 589f5aeb40c..4e1f5acc372 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/CustomResourceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/CustomResourceTest.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; import java.net.HttpURLConnection; @@ -37,6 +36,7 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.api.model.WatchEvent; +import io.fabric8.kubernetes.client.WatcherException; import okhttp3.mockwebserver.RecordedRequest; import org.junit.Rule; import org.junit.jupiter.api.Assertions; @@ -294,7 +294,7 @@ void testWatchAllResource() throws IOException, InterruptedException { @Override public void eventReceived(Action action, String resource) { anyEventReceived.countDown(); } @Override - public void onClose(KubernetesClientException cause) { } + public void onClose(WatcherException cause) { } }); // Then @@ -323,7 +323,7 @@ void testWatchSingleResource() throws IOException, InterruptedException { @Override public void eventReceived(Action action, String resource) { anyEventReceieved.countDown(); } @Override - public void onClose(KubernetesClientException cause) { } + public void onClose(WatcherException cause) { } }); // Then @@ -352,7 +352,7 @@ void testWatchWithLabels() throws IOException, InterruptedException { @Override public void eventReceived(Action action, String resource) { anyEventReceived.countDown(); } @Override - public void onClose(KubernetesClientException cause) { } + public void onClose(WatcherException cause) { } }); // Then @@ -383,7 +383,7 @@ void testWatchSomeResourceVersion() throws IOException, InterruptedException { @Override public void eventReceived(Action action, String resource) { anyEventReceived.countDown(); } @Override - public void onClose(KubernetesClientException cause) { } + public void onClose(WatcherException cause) { } }); // Then @@ -416,7 +416,7 @@ void testWatchWithListOptions() throws IOException, InterruptedException { @Override public void eventReceived(Action action, String resource) { anyEventReceived.countDown(); } @Override - public void onClose(KubernetesClientException cause) { } + public void onClose(WatcherException cause) { } }); // Then diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/EventTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/EventTest.java index 988f6e1e541..d9e7d8a73ed 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/EventTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/EventTest.java @@ -22,9 +22,9 @@ import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder; import io.fabric8.kubernetes.api.model.WatchEvent; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.server.mock.KubernetesServer; import io.fabric8.kubernetes.client.utils.Utils; import org.junit.Rule; @@ -72,7 +72,7 @@ public void eventReceived(Action action, Event resource) { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { } }); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodCrudTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodCrudTest.java index 3f0c3f6f70c..7337ce64b00 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodCrudTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodCrudTest.java @@ -21,9 +21,9 @@ import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodSpecBuilder; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.server.mock.KubernetesServer; import junit.framework.AssertionFailedError; import org.junit.Rule; @@ -41,13 +41,13 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; @EnableRuleMigrationSupport -public class PodCrudTest { +class PodCrudTest { @Rule public KubernetesServer server = new KubernetesServer(true, true); @Test - public void testCrud() { KubernetesClient client = server.getClient(); + void testCrud() { KubernetesClient client = server.getClient(); Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").addToLabels("testKey", "testValue").endMetadata().build(); Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").addToLabels("testKey", "testValue").endMetadata().build(); Pod pod3 = new PodBuilder().withNewMetadata().withName("pod3").endMetadata().build(); @@ -97,37 +97,13 @@ public class PodCrudTest { } @Test - public void testPodWatchOnName() throws InterruptedException { + void testPodWatchOnName() throws InterruptedException { KubernetesClient client = server.getClient(); Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").addToLabels("testKey", "testValue").endMetadata().build(); - final CountDownLatch deleteLatch = new CountDownLatch(1); - final CountDownLatch closeLatch = new CountDownLatch(1); - final CountDownLatch editLatch = new CountDownLatch(2); - final CountDownLatch addLatch = new CountDownLatch(1); + final LatchedWatcher lw = new LatchedWatcher(1, 2, 1, 1, 1); pod1 = client.pods().inNamespace("ns1").create(pod1); - Watch watch = client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).watch(new Watcher() { - @Override - public void eventReceived(Action action, Pod resource) { - switch (action) { - case DELETED: - deleteLatch.countDown(); - break; - case MODIFIED: - editLatch.countDown(); - break; - case ADDED: - addLatch.countDown(); - break; - default: - throw new AssertionFailedError(action.toString().concat(" isn't recognised.")); - } - } - @Override - public void onClose(KubernetesClientException cause) { - closeLatch.countDown(); - } - }); + Watch watch = client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).watch(lw); pod1 = client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()) .patch(new PodBuilder().withNewMetadataLike(pod1.getMetadata()).endMetadata().build()); @@ -141,49 +117,24 @@ public void onClose(KubernetesClientException cause) { client.pods().inNamespace("ns1").create(new PodBuilder().withNewMetadata().withName("pod1").addToLabels("testKey", "testValue").endMetadata().build()); assertEquals(1, client.pods().inNamespace("ns1").list().getItems().size()); - assertTrue(addLatch.await(1, TimeUnit.MINUTES)); - assertTrue(editLatch.await(1, TimeUnit.MINUTES)); - assertTrue(deleteLatch.await(1, TimeUnit.MINUTES)); + assertTrue(lw.addLatch.await(1, TimeUnit.MINUTES)); + assertTrue(lw.editLatch.await(1, TimeUnit.MINUTES)); + assertTrue(lw.deleteLatch.await(1, TimeUnit.MINUTES)); watch.close(); - assertTrue(closeLatch.await(1, TimeUnit.MINUTES)); + assertTrue(lw.closeLatch.await(1, TimeUnit.MINUTES)); } @Test - public void testPodWatchOnNamespace() throws InterruptedException { + void testPodWatchOnNamespace() throws InterruptedException { KubernetesClient client = server.getClient(); Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").addToLabels("testKey", "testValue").endMetadata().build(); - final CountDownLatch deleteLatch = new CountDownLatch(3); - final CountDownLatch closeLatch = new CountDownLatch(1); - final CountDownLatch addLatch = new CountDownLatch(3); - final CountDownLatch editLatch = new CountDownLatch(2); + final LatchedWatcher lw = new LatchedWatcher(); client.pods().inNamespace("ns1").create(pod1); - Watch watch = client.pods().inNamespace("ns1").watch(new Watcher() { - @Override - public void eventReceived(Action action, Pod resource) { - switch (action) { - case DELETED: - deleteLatch.countDown(); - break; - case ADDED: - addLatch.countDown(); - break; - case MODIFIED: - editLatch.countDown(); - break; - default: - throw new AssertionFailedError(action.toString()); - } - } - @Override - public void onClose(KubernetesClientException cause) { - closeLatch.countDown(); - } - }); - + Watch watch = client.pods().inNamespace("ns1").watch(lw); client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()) .patch(new PodBuilder().withNewMetadataLike(pod1.getMetadata()).endMetadata().build()); @@ -201,45 +152,20 @@ public void onClose(KubernetesClientException cause) { assertEquals(0, client.pods().inNamespace("ns1").list().getItems().size()); watch.close(); - assertTrue(closeLatch.await(2, TimeUnit.MINUTES)); + assertTrue(lw.closeLatch.await(1, TimeUnit.MINUTES)); } @Test - public void testPodWatchOnLabels() throws InterruptedException { + void testPodWatchOnLabels() throws InterruptedException { KubernetesClient client = server.getClient(); Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").addToLabels("test", "watch").endMetadata().build(); - final CountDownLatch deleteLatch = new CountDownLatch(1); - final CountDownLatch closeLatch = new CountDownLatch(1); - final CountDownLatch addLatch = new CountDownLatch(2); - final CountDownLatch editLatch = new CountDownLatch(1); + final LatchedWatcher lw = new LatchedWatcher(2, 1, 1, 1, 1); client.pods().inNamespace("ns1").create(pod1); Watch watch = client.pods().inNamespace("ns1") .withLabels(new HashMap() {{ put("test", "watch");}}) - .watch(new Watcher() { - @Override - public void eventReceived(Action action, Pod resource) { - switch (action) { - case DELETED: - deleteLatch.countDown(); - break; - case ADDED: - addLatch.countDown(); - break; - case MODIFIED: - editLatch.countDown(); - break; - default: - throw new AssertionFailedError(action.toString()); - } - } - - @Override - public void onClose(KubernetesClientException cause) { - closeLatch.countDown(); - } - }); + .watch(lw); Map m = pod1.getMetadata().getLabels(); m.put("foo", "bar"); @@ -253,8 +179,8 @@ public void onClose(KubernetesClientException cause) { .build()); assertEquals(1, client.pods().inNamespace("ns1").list().getItems().size()); - assertTrue(deleteLatch.await(1, TimeUnit.MINUTES)); - assertTrue(editLatch.await(1, TimeUnit.MINUTES)); + assertTrue(lw.deleteLatch.await(1, TimeUnit.MINUTES)); + assertTrue(lw.editLatch.await(1, TimeUnit.MINUTES)); Pod pod2 = client.pods().inNamespace("ns1").create(new PodBuilder() .withNewMetadata().withName("pod2").addToLabels("foo", "bar").endMetadata() @@ -271,60 +197,82 @@ public void onClose(KubernetesClientException cause) { assertEquals(2, client.pods().inNamespace("ns1").list().getItems().size()); assertEquals(2, client.pods().inNamespace("ns1").withLabel("test", "watch").list().getItems().size()); - assertTrue(addLatch.await(1, TimeUnit.MINUTES)); + assertTrue(lw.addLatch.await(1, TimeUnit.MINUTES)); watch.close(); - assertTrue(closeLatch.await(1, TimeUnit.MINUTES)); + assertTrue(lw.closeLatch.await(1, TimeUnit.MINUTES)); } @Test - public void testPodWatchClientSocketError() throws InterruptedException { + void testPodWatchTryWithResources() throws InterruptedException { KubernetesClient client = server.getClient(); Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").addToLabels("testKey", "testValue").endMetadata().build(); - final CountDownLatch deleteLatch = new CountDownLatch(1); - final CountDownLatch closeLatch = new CountDownLatch(1); - final CountDownLatch editLatch = new CountDownLatch(1); - final CountDownLatch addLatch = new CountDownLatch(1); + final LatchedWatcher lw = new LatchedWatcher(); client.pods().inNamespace("ns1").create(pod1); - try (Watch watch = client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).watch(new Watcher() { - @Override - public void eventReceived(Action action, Pod resource) { - switch (action) { - case DELETED: - deleteLatch.countDown(); - break; - case MODIFIED: - editLatch.countDown(); - break; - case ADDED: - addLatch.countDown(); - break; - default: - throw new AssertionFailedError(action.toString().concat(" isn't recognised.")); - } - } - - @Override - public void onClose(KubernetesClientException e) { - closeLatch.countDown(); - } - })) { - client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()) + try ( + Watch watch = client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).watch(lw) + ) { + client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()) .patch(new PodBuilder().withNewMetadataLike(pod1.getMetadata()).endMetadata().build()); - client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).delete(); + client.pods().inNamespace("ns1").withName(pod1.getMetadata().getName()).delete(); + + client.pods().inNamespace("ns1").create(new PodBuilder().withNewMetadata().withName("pod1").addToLabels("testKey", "testValue").endMetadata().build()); + + assertEquals(1, client.pods().inNamespace("ns1").list().getItems().size()); + assertTrue(lw.addLatch.await(1, TimeUnit.SECONDS)); + assertTrue(lw.editLatch.await(1, TimeUnit.SECONDS)); + assertTrue(lw.deleteLatch.await(1, TimeUnit.SECONDS)); + } + assertTrue(lw.closeLatch.await(3, TimeUnit.SECONDS)); + } + + private static final class LatchedWatcher implements Watcher { + final CountDownLatch addLatch; + final CountDownLatch editLatch; + final CountDownLatch deleteLatch; + final CountDownLatch closeErrorLatch; + final CountDownLatch closeLatch; + + public LatchedWatcher() { + this(1, 1, 1, 1, 1); + } + public LatchedWatcher(int addLatch, int editLatch, int deleteLatch, int closeErrorLatch, int closeLatch) { + this.addLatch = new CountDownLatch(addLatch); + this.editLatch = new CountDownLatch(editLatch); + this.deleteLatch = new CountDownLatch(deleteLatch); + this.closeErrorLatch = new CountDownLatch(closeErrorLatch); + this.closeLatch = new CountDownLatch(closeLatch); + } + + @Override + public void eventReceived(Action action, Pod resource) { + switch (action) { + case DELETED: + deleteLatch.countDown(); + break; + case MODIFIED: + editLatch.countDown(); + break; + case ADDED: + addLatch.countDown(); + break; + default: + throw new AssertionFailedError(action.toString().concat(" isn't recognised.")); + } + } - client.pods().inNamespace("ns1").create(new PodBuilder().withNewMetadata().withName("pod1").addToLabels("testKey", "testValue").endMetadata().build()); + @Override + public void onClose(WatcherException e) { + closeErrorLatch.countDown(); + } - assertEquals(1, client.pods().inNamespace("ns1").list().getItems().size()); - assertTrue(addLatch.await(1, TimeUnit.SECONDS)); - assertTrue(editLatch.await(1, TimeUnit.SECONDS)); - assertTrue(deleteLatch.await(1, TimeUnit.SECONDS)); - } finally { - assertTrue(closeLatch.await(3, TimeUnit.SECONDS)); + @Override + public void onClose() { + closeLatch.countDown(); } } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java index ee60bd1d81b..347682e13bc 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java @@ -43,6 +43,7 @@ import io.fabric8.kubernetes.client.PortForward; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.dsl.ExecWatch; import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; @@ -359,7 +360,7 @@ public void eventReceived(Action action, Pod resource) { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { } }; // When diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java index ef02735a580..e96c71d4932 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java @@ -27,13 +27,14 @@ import io.fabric8.kubernetes.client.ResourceNotFoundException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.Applicable; import io.fabric8.kubernetes.client.dsl.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable; import io.fabric8.kubernetes.client.dsl.PodResource; -import io.fabric8.kubernetes.client.dsl.base.WaitForConditionWatcher; import io.fabric8.kubernetes.client.internal.readiness.Readiness; import io.fabric8.kubernetes.client.server.mock.KubernetesServer; import okhttp3.mockwebserver.RecordedRequest; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.Assert; import org.junit.Rule; import org.junit.jupiter.api.Assertions; @@ -58,7 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; @EnableRuleMigrationSupport -public class ResourceTest { +class ResourceTest { @Rule public KubernetesServer server = new KubernetesServer(); @@ -186,7 +187,7 @@ public void eventReceived(Action action, Pod resource) { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { } }); @@ -345,10 +346,11 @@ void testWaitUntilConditionWhenResourceVersionTooOld() throws InterruptedExcepti ops.waitUntilCondition(isReady, 4, SECONDS) ); assertThat(ex) - .hasCauseExactlyInstanceOf(WaitForConditionWatcher.WatchException.class); - assertThat(ex.getCause()) - .hasCauseExactlyInstanceOf(KubernetesClientException.class) - .hasMessage("Watcher closed"); + .hasCauseExactlyInstanceOf(WatcherException.class) + .extracting(Throwable::getCause) + .asInstanceOf(InstanceOfAssertFactories.type(WatcherException.class)) + .extracting(WatcherException::isHttpGone) + .isEqualTo(true); Pod pod = client.pods() .withName("pod1") @@ -473,7 +475,7 @@ void testDontRetryWatchOnHttpGone() throws InterruptedException { client.resource(noReady).waitUntilReady(5, SECONDS); fail("should have thrown KubernetesClientException"); } catch (KubernetesClientException e) { - assertTrue(e.getCause() instanceof WaitForConditionWatcher.WatchException); + assertTrue(e.getCause() instanceof WatcherException); } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java index 693ceab2452..7faac4149c5 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java @@ -33,6 +33,7 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.Watchable; import io.fabric8.kubernetes.client.server.mock.KubernetesServer; @@ -86,8 +87,8 @@ public void eventReceived(Action action, Pod resource) { } @Override - public void onClose(KubernetesClientException cause) { - assertEquals(410, cause.getCode()); + public void onClose(WatcherException cause) { + assertTrue(cause.isHttpGone()); closeLatch.countDown(); } }; @@ -115,8 +116,12 @@ public void eventReceived(Action action, Pod resource) { } @Override - public void onClose(KubernetesClientException cause) { - assertNull("Close event should be invoked by try-with-resources successful completion, not by exception", cause); + public void onClose(WatcherException cause) { + fail("Close event should be invoked by try-with-resources successful completion, not by exception"); + } + + @Override + public void onClose() { closeLatch.countDown(); } }; @@ -151,7 +156,7 @@ void testWithTimeoutSecondsShouldAddQueryParam() throws InterruptedException { public void eventReceived(Action action, Pod resource) { eventReceivedLatch.countDown(); } @Override - public void onClose(KubernetesClientException cause) { } + public void onClose(WatcherException cause) { } }); // Then @@ -181,8 +186,8 @@ public void eventReceived(Action action, Pod resource) { } @Override - public void onClose(KubernetesClientException cause) { - assertEquals(410, cause.getCode()); + public void onClose(WatcherException cause) { + assertTrue(cause.isHttpGone()); closeLatch.countDown(); } }; @@ -211,7 +216,12 @@ public void eventReceived(Action action, Pod resource) { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { + fail(); + } + + @Override + public void onClose() { closeLatch.countDown(); } }); @@ -257,7 +267,7 @@ public void eventReceived(Action action, Pod resource) { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { } });