Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Make http watches async
  • Loading branch information
foxish committed Feb 9, 2017
commit 814bf4fe17eff801b5b119d303f995e8356e5568
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,41 +15,32 @@
*/
package io.fabric8.kubernetes.client.dsl.internal;

import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
import static java.net.HttpURLConnection.HTTP_GONE;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.WatchEvent;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;

import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import okhttp3.*;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
import static java.net.HttpURLConnection.HTTP_GONE;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> implements
Watch {
Expand Down Expand Up @@ -79,9 +70,9 @@ public Thread newThread(Runnable r) {
});

public WatchHTTPManager(final OkHttpClient client,
final BaseOperation<T, L, ?, ?> baseOperation,
final String version, final Watcher<T> watcher, final int reconnectInterval,
final int reconnectLimit, long connectTimeout)
final BaseOperation<T, L, ?, ?> baseOperation,
final String version, final Watcher<T> watcher, final int reconnectInterval,
final int reconnectLimit, long connectTimeout)
throws MalformedURLException {

if (version == null) {
Expand All @@ -97,7 +88,7 @@ public WatchHTTPManager(final OkHttpClient client,

OkHttpClient clonedClient = client.newBuilder()
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
.readTimeout(0,TimeUnit.MILLISECONDS)
.readTimeout(0, TimeUnit.MILLISECONDS)
.cache(null)
.build();

Expand Down Expand Up @@ -147,42 +138,51 @@ private final void runWatch() {
.addHeader("Origin", requestUrl.getProtocol() + "://" + requestUrl.getHost() + ":" + requestUrl.getPort())
.build();

Response response = null;
try {
response = clonedClient.newCall(request).execute();
if(!response.isSuccessful()) {
throw OperationSupport.requestFailure(request,
OperationSupport.createStatus(response.code(), response.message()));
clonedClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.info("Watch connection failed. reason: {}", e.getMessage());
scheduleReconnect();
}

BufferedSource source = response.body().source();
while (!source.exhausted()) {
String message = source.readUtf8LineStrict();
onMessage(message);
}
} catch (Exception e) {
logger.info("Watch connection close received. reason: {}", e.getMessage());
} finally {
if (forceClosed.get()) {
logger.warn("Ignoring onClose for already closed/closing connection");
return;
}
if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
return;
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if (!response.isSuccessful()) {
throw OperationSupport.requestFailure(request,
OperationSupport.createStatus(response.code(), response.message()));
}

try {
BufferedSource source = response.body().source();
while (!source.exhausted()) {
String message = source.readUtf8LineStrict();
onMessage(message);
}
} catch (Exception e) {
logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage());
}

// if we get here, the source is exhausted, so, we have lost our "watch".
// we must reconnect.
if (response != null) {
response.body().close();
// if we get here, the source is exhausted, so, we have lost our "watch".
// we must reconnect.
if (response != null) {
response.body().close();
}
scheduleReconnect();
}
scheduleReconnect();
}
});
}

private void scheduleReconnect() {
if (forceClosed.get()) {
logger.warn("Ignoring error for already closed/closing connection");
return;
}

if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
return;
}

logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
Expand Down