Skip to content

Commit 23418cb

Browse files
committed
refactor: Watcher.onClose has dedicated WatcherException as parameter.
1 parent 1e8abb9 commit 23418cb

File tree

28 files changed

+323
-219
lines changed

28 files changed

+323
-219
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#### Bugs
66

77
#### Improvements
8+
* Fix #2614: Watcher.onClose has dedicated WatcherException as parameter.
89

910
#### Dependency Upgrade
1011

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public interface Watcher<T> {
2424
*
2525
* @param cause What caused the watcher to be closed. Null means normal close.
2626
*/
27-
void onClose(KubernetesClientException cause);
27+
void onClose(WatcherException cause);
2828

2929
enum Action {
3030
ADDED, MODIFIED, DELETED, ERROR
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright (C) 2015 Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.fabric8.kubernetes.client;
17+
18+
import java.net.HttpURLConnection;
19+
20+
public class WatcherException extends Exception {
21+
22+
public WatcherException(String message, Throwable cause) {
23+
super(message, cause);
24+
}
25+
26+
public WatcherException(String message) {
27+
super(message);
28+
}
29+
30+
public KubernetesClientException asClientException() {
31+
final Throwable cause = getCause();
32+
return getCause() instanceof KubernetesClientException ?
33+
(KubernetesClientException) cause : new KubernetesClientException(getMessage(), cause);
34+
}
35+
36+
public boolean isHttpGone() {
37+
final KubernetesClientException cause = asClientException();
38+
return cause.getCode() == HttpURLConnection.HTTP_GONE
39+
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE);
40+
}
41+
42+
public boolean isShouldRetry() {
43+
return getCause() == null || !isHttpGone();
44+
}
45+
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.fabric8.kubernetes.client.dsl.base;
1717

1818
import io.fabric8.kubernetes.api.model.ObjectReference;
19+
import io.fabric8.kubernetes.client.WatcherException;
1920
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
@@ -45,7 +46,6 @@
4546
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
4647
import io.fabric8.kubernetes.client.dsl.Replaceable;
4748
import io.fabric8.kubernetes.client.dsl.Resource;
48-
import io.fabric8.kubernetes.client.dsl.base.WaitForConditionWatcher.WatchException;
4949
import io.fabric8.kubernetes.client.dsl.internal.DefaultOperationInfo;
5050
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
5151
import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager;
@@ -1104,7 +1104,7 @@ private T waitUntilConditionWithRetries(Predicate<T> condition, long timeoutNano
11041104
return watcher.getFuture().get(remainingNanosToWait, NANOSECONDS);
11051105
} catch (ExecutionException e) {
11061106
Throwable cause = e.getCause();
1107-
if (cause instanceof WatchException && ((WatchException) cause).isShouldRetry()) {
1107+
if (cause instanceof WatcherException && ((WatcherException) cause).isShouldRetry()) {
11081108
LOG.debug("retryable watch exception encountered, retrying after {} millis", currentBackOff, cause);
11091109
Thread.sleep(currentBackOff);
11101110
currentBackOff *= watchRetryBackoffMultiplier;

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515
*/
1616
package io.fabric8.kubernetes.client.dsl.base;
1717

18-
import java.net.HttpURLConnection;
1918
import java.util.concurrent.CompletableFuture;
2019
import java.util.function.Predicate;
2120

2221
import io.fabric8.kubernetes.api.model.HasMetadata;
23-
import io.fabric8.kubernetes.client.KubernetesClientException;
2422
import io.fabric8.kubernetes.client.Watcher;
23+
import io.fabric8.kubernetes.client.WatcherException;
2524

2625
public class WaitForConditionWatcher<T extends HasMetadata> implements Watcher<T> {
2726

@@ -52,34 +51,14 @@ public void eventReceived(Action action, T resource) {
5251
}
5352
break;
5453
case ERROR:
55-
future.completeExceptionally(new WatchException("Action.ERROR received"));
54+
future.completeExceptionally(new WatcherException("Action.ERROR received"));
5655
break;
5756
}
5857
}
5958

6059
@Override
61-
public void onClose(KubernetesClientException cause) {
62-
future.completeExceptionally(new WatchException("Watcher closed", cause));
60+
public void onClose(WatcherException cause) {
61+
future.completeExceptionally(cause == null ? new WatcherException("Watcher closed") : cause);
6362
}
6463

65-
public static class WatchException extends Exception {
66-
67-
public WatchException(String message, KubernetesClientException cause) {
68-
super(message, cause);
69-
}
70-
71-
public WatchException(String message) {
72-
super(message);
73-
}
74-
75-
public boolean isShouldRetry() {
76-
return getCause() == null || !isHttpGone();
77-
}
78-
79-
private boolean isHttpGone() {
80-
KubernetesClientException cause = ((KubernetesClientException) getCause());
81-
return cause.getCode() == HttpURLConnection.HTTP_GONE
82-
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE);
83-
}
84-
}
8564
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Copyright (C) 2015 Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.fabric8.kubernetes.client.dsl.internal;
17+
18+
import io.fabric8.kubernetes.api.model.ListOptions;
19+
import io.fabric8.kubernetes.client.Watch;
20+
import io.fabric8.kubernetes.client.Watcher;
21+
import io.fabric8.kubernetes.client.WatcherException;
22+
import okhttp3.OkHttpClient;
23+
import okhttp3.WebSocket;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
30+
public abstract class AbstractWatchManager<T> implements Watch {
31+
32+
private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class);
33+
34+
final Watcher<T> watcher;
35+
final int reconnectLimit;
36+
final int reconnectInterval;
37+
final int maxIntervalExponent;
38+
final ListOptions listOptions;
39+
final AtomicReference<String> resourceVersion;
40+
final OkHttpClient clonedClient;
41+
42+
private final AtomicBoolean forceClosed;
43+
44+
AbstractWatchManager(
45+
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent,
46+
OkHttpClient clonedClient
47+
) {
48+
this.watcher = watcher;
49+
this.listOptions = listOptions;
50+
this.reconnectLimit = reconnectLimit;
51+
this.reconnectInterval = reconnectInterval;
52+
this.maxIntervalExponent = maxIntervalExponent;
53+
this.clonedClient = clonedClient;
54+
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
55+
this.forceClosed = new AtomicBoolean();
56+
}
57+
58+
final void closeEvent(WatcherException cause) {
59+
if (forceClosed.getAndSet(true)) {
60+
logger.debug("Ignoring duplicate firing of onClose event");
61+
return;
62+
}
63+
watcher.onClose(cause);
64+
}
65+
66+
static void closeWebSocket(WebSocket webSocket) {
67+
if (webSocket != null) {
68+
logger.debug("Closing websocket {}", webSocket);
69+
try {
70+
if (!webSocket.close(1000, null)) {
71+
logger.warn("Failed to close websocket");
72+
}
73+
} catch (IllegalStateException e) {
74+
logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage());
75+
}
76+
}
77+
}
78+
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java

Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import io.fabric8.kubernetes.api.model.ListOptions;
2020
import io.fabric8.kubernetes.api.model.Status;
2121
import io.fabric8.kubernetes.client.KubernetesClientException;
22-
import io.fabric8.kubernetes.client.Watch;
2322
import io.fabric8.kubernetes.client.Watcher;
23+
import io.fabric8.kubernetes.client.WatcherException;
2424
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
2525
import io.fabric8.kubernetes.client.utils.Utils;
2626
import okhttp3.HttpUrl;
@@ -53,17 +53,12 @@
5353
* instead of using a solid type for deserializing events, it uses plain strings.
5454
*
5555
*/
56-
public class RawWatchConnectionManager implements Watch {
56+
public class RawWatchConnectionManager extends AbstractWatchManager<String> {
5757
private static final Logger logger = LoggerFactory.getLogger(RawWatchConnectionManager.class);
5858
private ObjectMapper objectMapper;
59-
private Watcher<String> watcher;
6059
private HttpUrl.Builder watchUrlBuilder;
6160

6261
private final AtomicBoolean forceClosed = new AtomicBoolean();
63-
private final AtomicReference<String> resourceVersion;
64-
private final int reconnectLimit;
65-
private final int reconnectInterval;
66-
private int maxIntervalExponent;
6762
private final AtomicInteger currentReconnectAttempt = new AtomicInteger(0);
6863
private final AtomicReference<WebSocket> webSocketRef = new AtomicReference<>();
6964
// single threaded serial executor
@@ -75,17 +70,14 @@ public class RawWatchConnectionManager implements Watch {
7570
private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
7671

7772
private WebSocket webSocket;
78-
private OkHttpClient clonedClient;
7973

8074
public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watchUrlBuilder, ListOptions listOptions, ObjectMapper objectMapper, final Watcher<String> watcher, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) {
81-
this.clonedClient = okHttpClient;
75+
super(
76+
watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent,
77+
okHttpClient.newBuilder().build()
78+
);
8279
this.watchUrlBuilder = watchUrlBuilder;
8380
this.objectMapper = objectMapper;
84-
this.watcher = watcher;
85-
this.reconnectLimit = reconnectLimit;
86-
this.reconnectInterval = reconnectInterval;
87-
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
88-
this.maxIntervalExponent = maxIntervalExponent;
8981
executor = Executors.newSingleThreadScheduledExecutor(r -> {
9082
Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(RawWatchConnectionManager.this));
9183
ret.setDaemon(true);
@@ -156,7 +148,7 @@ public void onClosed(WebSocket webSocket, int code, String reason) {
156148
return;
157149
}
158150
if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
159-
closeEvent(new KubernetesClientException("Connection unexpectedly closed"));
151+
closeEvent(new WatcherException("Connection unexpectedly closed"));
160152
return;
161153
}
162154
scheduleReconnect();
@@ -204,7 +196,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
204196
}
205197

206198
if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
207-
closeEvent(new KubernetesClientException("Connection failure", t));
199+
closeEvent(new WatcherException("Connection failure", t));
208200
return;
209201
}
210202

@@ -242,7 +234,7 @@ public void execute() {
242234
// An unexpected error occurred and we didn't even get an onFailure callback.
243235
logger.error("Exception in reconnect", e);
244236
webSocketRef.set(null);
245-
closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
237+
closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e));
246238
close();
247239
}
248240
}
@@ -277,27 +269,6 @@ public void close() {
277269
}
278270
}
279271

280-
private void closeEvent(KubernetesClientException cause) {
281-
if (forceClosed.getAndSet(true)) {
282-
logger.debug("Ignoring duplicate firing of onClose event");
283-
return;
284-
}
285-
watcher.onClose(cause);
286-
}
287-
288-
private void closeWebSocket(WebSocket ws) {
289-
if (ws != null) {
290-
logger.debug("Closing websocket {}", ws);
291-
try {
292-
if (!ws.close(1000, null)) {
293-
logger.warn("Failed to close websocket");
294-
}
295-
} catch (IllegalStateException e) {
296-
logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage());
297-
}
298-
}
299-
}
300-
301272
private long nextReconnectInterval() {
302273
int exponentOfTwo = currentReconnectAttempt.getAndIncrement();
303274
if (exponentOfTwo > maxIntervalExponent)

0 commit comments

Comments
 (0)