Skip to content

Commit 00b78c7

Browse files
committed
Allow watching resources over HTTP if watching using websockets fails
1 parent 39a7398 commit 00b78c7

File tree

2 files changed

+296
-0
lines changed

2 files changed

+296
-0
lines changed

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.fabric8.kubernetes.client.dsl.Reaper;
4141
import io.fabric8.kubernetes.client.dsl.Watchable;
4242
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
43+
import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager;
4344
import io.fabric8.kubernetes.client.utils.URLUtils;
4445

4546
import java.io.File;
@@ -649,6 +650,22 @@ public Watch watch(String resourceVersion, final Watcher<T> watcher) throws Kube
649650
return watch;
650651
} catch (MalformedURLException e) {
651652
throw KubernetesClientException.launderThrowable(e);
653+
} catch (KubernetesClientException ke) {
654+
WatchHTTPManager watch = null;
655+
try {
656+
watch = new WatchHTTPManager(
657+
client,
658+
this,
659+
resourceVersion,
660+
watcher,
661+
config.getWatchReconnectInterval(),
662+
config.getWatchReconnectLimit(),
663+
config.getConnectionTimeout()
664+
);
665+
} catch (MalformedURLException e) {
666+
throw KubernetesClientException.launderThrowable(e);
667+
}
668+
return watch;
652669
}
653670
}
654671

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/**
2+
* Copyright (C) 2017 Google, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.fabric8.kubernetes.client.dsl.internal;
18+
19+
import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
20+
import static java.net.HttpURLConnection.HTTP_GONE;
21+
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import io.fabric8.kubernetes.api.model.HasMetadata;
24+
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
25+
import io.fabric8.kubernetes.api.model.Status;
26+
import io.fabric8.kubernetes.api.model.WatchEvent;
27+
import io.fabric8.kubernetes.client.KubernetesClientException;
28+
import io.fabric8.kubernetes.client.Watch;
29+
import io.fabric8.kubernetes.client.Watcher;
30+
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
31+
32+
import java.io.IOException;
33+
import java.net.MalformedURLException;
34+
import java.net.URL;
35+
import java.util.concurrent.Executors;
36+
import java.util.concurrent.RejectedExecutionException;
37+
import java.util.concurrent.ScheduledExecutorService;
38+
import java.util.concurrent.ThreadFactory;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.concurrent.atomic.AtomicReference;
43+
import okhttp3.HttpUrl;
44+
import okhttp3.Interceptor;
45+
import okhttp3.OkHttpClient;
46+
import okhttp3.Request;
47+
import okhttp3.Response;
48+
import okhttp3.logging.HttpLoggingInterceptor;
49+
import okio.BufferedSource;
50+
import org.slf4j.Logger;
51+
import org.slf4j.LoggerFactory;
52+
53+
public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> implements
54+
Watch {
55+
private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class);
56+
private static final ObjectMapper mapper = new ObjectMapper();
57+
58+
private final BaseOperation<T, L, ?, ?> baseOperation;
59+
private final Watcher<T> watcher;
60+
private final AtomicBoolean forceClosed = new AtomicBoolean();
61+
private final AtomicReference<String> resourceVersion;
62+
private final int reconnectLimit;
63+
private final int reconnectInterval;
64+
65+
private final AtomicBoolean reconnectPending = new AtomicBoolean(false);
66+
private final static int maxIntervalExponent = 5; // max 32x slowdown from base interval
67+
private final URL requestUrl;
68+
private final AtomicInteger currentReconnectAttempt = new AtomicInteger(0);
69+
private OkHttpClient clonedClient;
70+
71+
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
72+
@Override
73+
public Thread newThread(Runnable r) {
74+
Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(WatchHTTPManager.this));
75+
ret.setDaemon(true);
76+
return ret;
77+
}
78+
});
79+
80+
public WatchHTTPManager(final OkHttpClient client,
81+
final BaseOperation<T, L, ?, ?> baseOperation,
82+
final String version, final Watcher<T> watcher, final int reconnectInterval,
83+
final int reconnectLimit, long connectTimeout)
84+
throws MalformedURLException {
85+
86+
if (version == null) {
87+
L currentList = baseOperation.list();
88+
this.resourceVersion = new AtomicReference<>(currentList.getMetadata().getResourceVersion());
89+
} else {
90+
this.resourceVersion = new AtomicReference<>(version);
91+
}
92+
this.baseOperation = baseOperation;
93+
this.watcher = watcher;
94+
this.reconnectInterval = reconnectInterval;
95+
this.reconnectLimit = reconnectLimit;
96+
97+
OkHttpClient clonedClient = client.newBuilder()
98+
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
99+
.readTimeout(0,TimeUnit.MILLISECONDS)
100+
.cache(null)
101+
.build();
102+
103+
// If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does
104+
// not let us stream responses from the server.
105+
for (Interceptor i : clonedClient.networkInterceptors()) {
106+
if (i instanceof HttpLoggingInterceptor) {
107+
HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i;
108+
interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
109+
}
110+
}
111+
112+
this.clonedClient = clonedClient;
113+
requestUrl = baseOperation.getNamespacedUrl();
114+
scheduleReconnect();
115+
}
116+
117+
private final void runWatch() {
118+
logger.debug("Watching via HTTP GET ... {}", this);
119+
120+
HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder();
121+
String labelQueryParam = baseOperation.getLabelQueryParam();
122+
if (isNotNullOrEmpty(labelQueryParam)) {
123+
httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam);
124+
}
125+
126+
String fieldQueryString = baseOperation.getFieldQueryParam();
127+
String name = baseOperation.getName();
128+
if (name != null && name.length() > 0) {
129+
if (fieldQueryString.length() > 0) {
130+
fieldQueryString += ",";
131+
}
132+
fieldQueryString += "metadata.name=" + name;
133+
}
134+
135+
if (isNotNullOrEmpty(fieldQueryString)) {
136+
httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
137+
}
138+
139+
httpUrlBuilder
140+
.addQueryParameter("resourceVersion", this.resourceVersion.get())
141+
.addQueryParameter("watch", "true");
142+
143+
final Request request = new Request.Builder()
144+
.get()
145+
.url(httpUrlBuilder.build())
146+
.addHeader("Origin", requestUrl.getProtocol() + "://" + requestUrl.getHost() + ":" + requestUrl.getPort())
147+
.build();
148+
149+
Response response = null;
150+
try {
151+
response = clonedClient.newCall(request).execute();
152+
BufferedSource source = response.body().source();
153+
while (!source.exhausted()) {
154+
String message = source.readUtf8LineStrict();
155+
onMessage(message);
156+
}
157+
} catch (IOException e) {
158+
logger.info("Watch connection close received. reason: {}", e.getMessage());
159+
} finally {
160+
if (forceClosed.get()) {
161+
logger.warn("Ignoring onClose for already closed/closing connection");
162+
return;
163+
}
164+
if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
165+
watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
166+
return;
167+
}
168+
169+
// if we get here, the source is exhausted, so, we have lost our "watch".
170+
// we must reconnect.
171+
if (response != null) {
172+
response.body().close();
173+
}
174+
scheduleReconnect();
175+
}
176+
}
177+
178+
private void scheduleReconnect() {
179+
logger.debug("Submitting reconnect task to the executor");
180+
// make sure that whichever thread calls this method, the tasks are
181+
// performed serially in the executor.
182+
executor.submit(new Runnable() {
183+
@Override
184+
public void run() {
185+
if (!reconnectPending.compareAndSet(false, true)) {
186+
logger.debug("Reconnect already scheduled");
187+
return;
188+
}
189+
try {
190+
// actual reconnect only after the back-off time has passed, without
191+
// blocking the thread
192+
logger.debug("Scheduling reconnect task");
193+
executor.schedule(new Runnable() {
194+
@Override
195+
public void run() {
196+
try {
197+
WatchHTTPManager.this.runWatch();
198+
reconnectPending.set(false);
199+
} catch (Exception e) {
200+
// An unexpected error occurred and we didn't even get an onFailure callback.
201+
logger.error("Exception in reconnect", e);
202+
close();
203+
watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
204+
}
205+
}
206+
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
207+
} catch (RejectedExecutionException e) {
208+
logger.error("Exception in reconnect", e);
209+
reconnectPending.set(false);
210+
}
211+
}
212+
});
213+
}
214+
215+
public void onMessage(String messageSource) throws IOException {
216+
try {
217+
WatchEvent event = mapper.readValue(messageSource, WatchEvent.class);
218+
if (event.getObject() instanceof HasMetadata) {
219+
@SuppressWarnings("unchecked")
220+
T obj = (T) event.getObject();
221+
// Dirty cast - should always be valid though
222+
String currentResourceVersion = resourceVersion.get();
223+
String newResourceVersion = ((HasMetadata) obj).getMetadata().getResourceVersion();
224+
if (currentResourceVersion.compareTo(newResourceVersion) < 0) {
225+
resourceVersion.compareAndSet(currentResourceVersion, newResourceVersion);
226+
}
227+
Watcher.Action action = Watcher.Action.valueOf(event.getType());
228+
watcher.eventReceived(action, obj);
229+
} else if (event.getObject() instanceof Status) {
230+
Status status = (Status) event.getObject();
231+
232+
// The resource version no longer exists - this has to be handled by the caller.
233+
if (status.getCode() == HTTP_GONE) {
234+
// exception
235+
// shut down executor, etc.
236+
close();
237+
watcher.onClose(new KubernetesClientException(status));
238+
return;
239+
}
240+
241+
logger.error("Error received: {}", status.toString());
242+
} else {
243+
logger.error("Unknown message received: {}", messageSource);
244+
}
245+
} catch (IOException e) {
246+
logger.error("Could not deserialize watch event: {}", messageSource, e);
247+
} catch (ClassCastException e) {
248+
logger.error("Received wrong type of object for watch", e);
249+
} catch (IllegalArgumentException e) {
250+
logger.error("Invalid event type", e);
251+
}
252+
}
253+
254+
private long nextReconnectInterval() {
255+
int exponentOfTwo = currentReconnectAttempt.getAndIncrement();
256+
if (exponentOfTwo > maxIntervalExponent)
257+
exponentOfTwo = maxIntervalExponent;
258+
long ret = reconnectInterval * (1 << exponentOfTwo);
259+
logger.info("Current reconnect backoff is " + ret + " milliseconds (T" + exponentOfTwo + ")");
260+
return ret;
261+
}
262+
263+
@Override
264+
public void close() {
265+
logger.debug("Force closing the watch {}", this);
266+
forceClosed.set(true);
267+
if (!executor.isShutdown()) {
268+
try {
269+
executor.shutdown();
270+
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
271+
logger.warn("Executor didn't terminate in time after shutdown in close(), killing it in: {}", this);
272+
executor.shutdownNow();
273+
}
274+
} catch (Throwable t) {
275+
throw KubernetesClientException.launderThrowable(t);
276+
}
277+
}
278+
}
279+
}

0 commit comments

Comments
 (0)