Skip to content

Commit daaba85

Browse files
Merge pull request #652 from foxish/add-hanging-get
2 parents dd7e7ca + 17e350b commit daaba85

File tree

4 files changed

+445
-2
lines changed

4 files changed

+445
-2
lines changed

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.fabric8.kubernetes.client.dsl.Reaper;
4141
import io.fabric8.kubernetes.client.dsl.Watchable;
4242
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
43+
import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager;
4344
import io.fabric8.kubernetes.client.utils.URLUtils;
4445

4546
import java.io.File;
@@ -649,6 +650,27 @@ public Watch watch(String resourceVersion, final Watcher<T> watcher) throws Kube
649650
return watch;
650651
} catch (MalformedURLException e) {
651652
throw KubernetesClientException.launderThrowable(e);
653+
} catch (KubernetesClientException ke) {
654+
if (ke.getCode() != 200) {
655+
throw ke;
656+
}
657+
658+
// If the HTTP return code is 200, we retry the watch again using a persistent hanging
659+
// HTTP GET. This is meant to handle cases like kubectl local proxy which does not support
660+
// websockets. Issue: https://github.com/kubernetes/kubernetes/issues/25126
661+
try {
662+
return new WatchHTTPManager(
663+
client,
664+
this,
665+
resourceVersion,
666+
watcher,
667+
config.getWatchReconnectInterval(),
668+
config.getWatchReconnectLimit(),
669+
config.getConnectionTimeout()
670+
);
671+
} catch (MalformedURLException e) {
672+
throw KubernetesClientException.launderThrowable(e);
673+
}
652674
}
653675
}
654676

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/OperationSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ public static Status createStatus(int statusCode, String message) {
302302
return status;
303303
}
304304

305-
KubernetesClientException requestFailure(Request request, Status status) {
305+
public static KubernetesClientException requestFailure(Request request, Status status) {
306306
StringBuilder sb = new StringBuilder();
307307
sb.append("Failure executing: ").append(request.method())
308308
.append(" at: ").append(request.url().toString()).append(".");
@@ -318,7 +318,7 @@ KubernetesClientException requestFailure(Request request, Status status) {
318318
return new KubernetesClientException(sb.toString(), status.getCode(), status);
319319
}
320320

321-
KubernetesClientException requestException(Request request, Exception e) {
321+
public static KubernetesClientException requestException(Request request, Exception e) {
322322
StringBuilder sb = new StringBuilder();
323323
sb.append("Error executing: ").append(request.method())
324324
.append(" at: ").append(request.url().toString())
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/**
2+
* Copyright (C) 2015 Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.fabric8.kubernetes.client.dsl.internal;
17+
18+
import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
19+
import static java.net.HttpURLConnection.HTTP_GONE;
20+
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import io.fabric8.kubernetes.api.model.HasMetadata;
23+
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
24+
import io.fabric8.kubernetes.api.model.Status;
25+
import io.fabric8.kubernetes.api.model.WatchEvent;
26+
import io.fabric8.kubernetes.client.KubernetesClient;
27+
import io.fabric8.kubernetes.client.KubernetesClientException;
28+
import io.fabric8.kubernetes.client.Watch;
29+
import io.fabric8.kubernetes.client.Watcher;
30+
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
31+
32+
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
33+
import java.io.IOException;
34+
import java.net.MalformedURLException;
35+
import java.net.URL;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.RejectedExecutionException;
38+
import java.util.concurrent.ScheduledExecutorService;
39+
import java.util.concurrent.ThreadFactory;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.concurrent.atomic.AtomicInteger;
43+
import java.util.concurrent.atomic.AtomicReference;
44+
import okhttp3.HttpUrl;
45+
import okhttp3.Interceptor;
46+
import okhttp3.OkHttpClient;
47+
import okhttp3.Request;
48+
import okhttp3.Response;
49+
import okhttp3.logging.HttpLoggingInterceptor;
50+
import okio.BufferedSource;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
54+
public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> implements
55+
Watch {
56+
private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class);
57+
private static final ObjectMapper mapper = new ObjectMapper();
58+
59+
private final BaseOperation<T, L, ?, ?> baseOperation;
60+
private final Watcher<T> watcher;
61+
private final AtomicBoolean forceClosed = new AtomicBoolean();
62+
private final AtomicReference<String> resourceVersion;
63+
private final int reconnectLimit;
64+
private final int reconnectInterval;
65+
66+
private final AtomicBoolean reconnectPending = new AtomicBoolean(false);
67+
private final static int maxIntervalExponent = 5; // max 32x slowdown from base interval
68+
private final URL requestUrl;
69+
private final AtomicInteger currentReconnectAttempt = new AtomicInteger(0);
70+
private OkHttpClient clonedClient;
71+
72+
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
73+
@Override
74+
public Thread newThread(Runnable r) {
75+
Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(WatchHTTPManager.this));
76+
ret.setDaemon(true);
77+
return ret;
78+
}
79+
});
80+
81+
public WatchHTTPManager(final OkHttpClient client,
82+
final BaseOperation<T, L, ?, ?> baseOperation,
83+
final String version, final Watcher<T> watcher, final int reconnectInterval,
84+
final int reconnectLimit, long connectTimeout)
85+
throws MalformedURLException {
86+
87+
if (version == null) {
88+
L currentList = baseOperation.list();
89+
this.resourceVersion = new AtomicReference<>(currentList.getMetadata().getResourceVersion());
90+
} else {
91+
this.resourceVersion = new AtomicReference<>(version);
92+
}
93+
this.baseOperation = baseOperation;
94+
this.watcher = watcher;
95+
this.reconnectInterval = reconnectInterval;
96+
this.reconnectLimit = reconnectLimit;
97+
98+
OkHttpClient clonedClient = client.newBuilder()
99+
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
100+
.readTimeout(0,TimeUnit.MILLISECONDS)
101+
.cache(null)
102+
.build();
103+
104+
// If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does
105+
// not let us stream responses from the server.
106+
for (Interceptor i : clonedClient.networkInterceptors()) {
107+
if (i instanceof HttpLoggingInterceptor) {
108+
HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i;
109+
interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
110+
}
111+
}
112+
113+
this.clonedClient = clonedClient;
114+
requestUrl = baseOperation.getNamespacedUrl();
115+
runWatch();
116+
}
117+
118+
private final void runWatch() {
119+
logger.debug("Watching via HTTP GET ... {}", this);
120+
121+
HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder();
122+
String labelQueryParam = baseOperation.getLabelQueryParam();
123+
if (isNotNullOrEmpty(labelQueryParam)) {
124+
httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam);
125+
}
126+
127+
String fieldQueryString = baseOperation.getFieldQueryParam();
128+
String name = baseOperation.getName();
129+
if (name != null && name.length() > 0) {
130+
if (fieldQueryString.length() > 0) {
131+
fieldQueryString += ",";
132+
}
133+
fieldQueryString += "metadata.name=" + name;
134+
}
135+
136+
if (isNotNullOrEmpty(fieldQueryString)) {
137+
httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
138+
}
139+
140+
httpUrlBuilder
141+
.addQueryParameter("resourceVersion", this.resourceVersion.get())
142+
.addQueryParameter("watch", "true");
143+
144+
final Request request = new Request.Builder()
145+
.get()
146+
.url(httpUrlBuilder.build())
147+
.addHeader("Origin", requestUrl.getProtocol() + "://" + requestUrl.getHost() + ":" + requestUrl.getPort())
148+
.build();
149+
150+
Response response = null;
151+
try {
152+
response = clonedClient.newCall(request).execute();
153+
if(!response.isSuccessful()) {
154+
throw OperationSupport.requestFailure(request,
155+
OperationSupport.createStatus(response.code(), response.message()));
156+
}
157+
158+
BufferedSource source = response.body().source();
159+
while (!source.exhausted()) {
160+
String message = source.readUtf8LineStrict();
161+
onMessage(message);
162+
}
163+
} catch (Exception e) {
164+
logger.info("Watch connection close received. reason: {}", e.getMessage());
165+
} finally {
166+
if (forceClosed.get()) {
167+
logger.warn("Ignoring onClose for already closed/closing connection");
168+
return;
169+
}
170+
if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
171+
watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
172+
return;
173+
}
174+
175+
176+
// if we get here, the source is exhausted, so, we have lost our "watch".
177+
// we must reconnect.
178+
if (response != null) {
179+
response.body().close();
180+
}
181+
scheduleReconnect();
182+
}
183+
}
184+
185+
private void scheduleReconnect() {
186+
logger.debug("Submitting reconnect task to the executor");
187+
// make sure that whichever thread calls this method, the tasks are
188+
// performed serially in the executor.
189+
executor.submit(new Runnable() {
190+
@Override
191+
public void run() {
192+
if (!reconnectPending.compareAndSet(false, true)) {
193+
logger.debug("Reconnect already scheduled");
194+
return;
195+
}
196+
try {
197+
// actual reconnect only after the back-off time has passed, without
198+
// blocking the thread
199+
logger.debug("Scheduling reconnect task");
200+
executor.schedule(new Runnable() {
201+
@Override
202+
public void run() {
203+
try {
204+
WatchHTTPManager.this.runWatch();
205+
reconnectPending.set(false);
206+
} catch (Exception e) {
207+
// An unexpected error occurred and we didn't even get an onFailure callback.
208+
logger.error("Exception in reconnect", e);
209+
close();
210+
watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
211+
}
212+
}
213+
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
214+
} catch (RejectedExecutionException e) {
215+
logger.error("Exception in reconnect", e);
216+
reconnectPending.set(false);
217+
}
218+
}
219+
});
220+
}
221+
222+
public void onMessage(String messageSource) throws IOException {
223+
try {
224+
WatchEvent event = mapper.readValue(messageSource, WatchEvent.class);
225+
if (event.getObject() instanceof HasMetadata) {
226+
@SuppressWarnings("unchecked")
227+
T obj = (T) event.getObject();
228+
// Dirty cast - should always be valid though
229+
String currentResourceVersion = resourceVersion.get();
230+
String newResourceVersion = ((HasMetadata) obj).getMetadata().getResourceVersion();
231+
if (currentResourceVersion.compareTo(newResourceVersion) < 0) {
232+
resourceVersion.compareAndSet(currentResourceVersion, newResourceVersion);
233+
}
234+
Watcher.Action action = Watcher.Action.valueOf(event.getType());
235+
watcher.eventReceived(action, obj);
236+
} else if (event.getObject() instanceof Status) {
237+
Status status = (Status) event.getObject();
238+
// The resource version no longer exists - this has to be handled by the caller.
239+
if (status.getCode() == HTTP_GONE) {
240+
// exception
241+
// shut down executor, etc.
242+
close();
243+
watcher.onClose(new KubernetesClientException(status));
244+
return;
245+
}
246+
247+
logger.error("Error received: {}", status.toString());
248+
} else {
249+
logger.error("Unknown message received: {}", messageSource);
250+
}
251+
} catch (IOException e) {
252+
logger.error("Could not deserialize watch event: {}", messageSource, e);
253+
} catch (ClassCastException e) {
254+
logger.error("Received wrong type of object for watch", e);
255+
} catch (IllegalArgumentException e) {
256+
logger.error("Invalid event type", e);
257+
}
258+
}
259+
260+
private long nextReconnectInterval() {
261+
int exponentOfTwo = currentReconnectAttempt.getAndIncrement();
262+
if (exponentOfTwo > maxIntervalExponent)
263+
exponentOfTwo = maxIntervalExponent;
264+
long ret = reconnectInterval * (1 << exponentOfTwo);
265+
logger.info("Current reconnect backoff is " + ret + " milliseconds (T" + exponentOfTwo + ")");
266+
return ret;
267+
}
268+
269+
@Override
270+
public void close() {
271+
logger.debug("Force closing the watch {}", this);
272+
forceClosed.set(true);
273+
if (!executor.isShutdown()) {
274+
try {
275+
executor.shutdown();
276+
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
277+
logger.warn("Executor didn't terminate in time after shutdown in close(), killing it in: {}", this);
278+
executor.shutdownNow();
279+
}
280+
} catch (Throwable t) {
281+
throw KubernetesClientException.launderThrowable(t);
282+
}
283+
}
284+
}
285+
}

0 commit comments

Comments
 (0)