Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ will be thrown

| dump_requests | Dumps all discovery requests and responses to the Kubernetes server to stdout when true

| split_clusters_during_rolling_update | During the Rolling Update, prevents from putting all Pods into a single cluster

|===============


Expand Down Expand Up @@ -312,4 +314,4 @@ The commands for running on https://cloud.google.com/container-engine/docs/[Goog
as when running locally in https://github.com/kubernetes/minikube[minikube].

The only difference is that on GKE, contrary to minikube, IP multicasting is not available. This means that the `probe.sh`
command has to be run as `probe.sh -addr localhost` instead of simply running `probe.sh`.
command has to be run as `probe.sh -addr localhost` instead of simply running `probe.sh`.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<version.jgroups>4.0.4.Final</version.jgroups>
<version.oauth>20100527</version.oauth>
<version.junit>4.12</version.junit>
<version.assertj>3.8.0</version.assertj>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

Expand Down Expand Up @@ -146,6 +147,12 @@
<version>${version.junit}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${version.assertj}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
59 changes: 36 additions & 23 deletions src/main/java/org/jgroups/protocols/kubernetes/Client.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package org.jgroups.protocols.kubernetes;

import mjson.Json;
import static org.jgroups.protocols.kubernetes.Utils.openStream;
import static org.jgroups.protocols.kubernetes.Utils.urlencode;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;

import org.jgroups.logging.Log;
import org.jgroups.protocols.kubernetes.stream.StreamProvider;
import org.jgroups.util.Util;

import java.io.InputStream;
import java.util.*;

import static org.jgroups.protocols.kubernetes.Utils.openStream;
import static org.jgroups.protocols.kubernetes.Utils.urlencode;
import mjson.Json;

/**
* @author <a href="mailto:[email protected]">Ales Justin</a>
Expand Down Expand Up @@ -71,17 +77,18 @@ protected String fetchFromKubernetes(String op, String namespace, String labels,



public List<String> getPods(String namespace, String labels, boolean dump_requests) throws Exception {
public List<Pod> getPods(String namespace, String labels, boolean dump_requests) throws Exception {
String result=fetchFromKubernetes("pods", namespace, labels, dump_requests);
if(result == null)
return Collections.emptyList();
return parseJsonResult(result, namespace, labels);
}

protected List<String> parseJsonResult(String input, String namespace, String labels) {
protected List<Pod> parseJsonResult(String input, String namespace, String labels) {
if(input == null)
return Collections.emptyList();
Json json=Json.read(input);

if(json == null || !json.isObject()) {
log.error("JSON is not a map: %s", json);
return Collections.emptyList();
Expand All @@ -92,22 +99,28 @@ protected List<String> parseJsonResult(String input, String namespace, String la
return Collections.emptyList();
}
List<Json> items=json.at("items").asJsonList();
List<String> pods=new ArrayList<>();
List<Pod> pods=new ArrayList<>();
for(Json obj: items) {
if(obj.isObject() && obj.has("status")) {
Json status=obj.at("status");
if(status.isObject() && status.has("podIP")) {
String podIP=status.at("podIP").asString();
if(status.has("phase")) {
Json phase=status.at("phase");
if(phase != null && phase.isString() && !"Running".equals(phase.asString())) {
log.trace("skipped pod with IP=%s as it is not running (%s)", podIP, phase);
continue;
}
}
if(!pods.contains(podIP))
pods.add(podIP);
}
String parentDeployment = Optional.ofNullable(obj.at("metadata"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@belaban Are you OK with this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big change: I'd like to test this with minikube first... however the turnaround time for this is quite high .. stay tuned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, no problem. Take your time and keep me posted ;)

.map(podMetadata -> podMetadata.at("labels"))
.map(podLabels -> podLabels.at("deployment"))
.map(podDeployment -> podDeployment.asString())
.orElseGet(() -> null);
String name = Optional.ofNullable(obj.at("metadata"))
.map(podMetadata -> podMetadata.at("name"))
.map(podName -> podName.asString())
.orElseGet(() -> null);
String podIP = Optional.ofNullable(obj.at("status"))
.map(podStatus -> podStatus.at("podIP"))
.map(podIp -> podIp.asString())
.orElseGet(() -> null);
if(podIP == null) {
//Previously we did checks on phase. But from my observations, it is extremely rare to have a container
//listed by Kubernetes API with any other status but Running (I might imagine it will hang in scheduled).
//However in both cases, its IP address will be null. So it is much better to stick to that.
log.trace("Skipping pod %s since it has no IP %s", name, podIP);
} else {
pods.add(new Pod(name, podIP, parentDeployment));
}
}
log.trace("getPods(%s, %s) = %s", namespace, labels, pods);
Expand Down
61 changes: 50 additions & 11 deletions src/main/java/org/jgroups/protocols/kubernetes/KUBE_PING.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@

package org.jgroups.protocols.kubernetes;

import static org.jgroups.protocols.kubernetes.Utils.readFileToString;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
Expand All @@ -20,11 +32,6 @@
import org.jgroups.util.NameCache;
import org.jgroups.util.Responses;

import java.util.*;
import java.util.stream.Collectors;

import static org.jgroups.protocols.kubernetes.Utils.readFileToString;

/**
* Kubernetes based discovery protocol. Uses the Kubernetes master to fetch the IP addresses of all pods that have
* been created, then pings each pods separately. The ports are defined by bind_port in TP plus port_range.
Expand Down Expand Up @@ -101,6 +108,11 @@ public class KUBE_PING extends Discovery {
@Property(description="Dumps all discovery requests and responses to the Kubernetes server to stdout when true")
protected boolean dump_requests;

@Property(description="The standard behavior during Rolling Update is to put all Pods in the same cluster. In" +
" cases (application level incompatibility) this causes problems. One might decide to split clusters to" +
" 'old' and 'new' during that process")
protected boolean split_clusters_during_rolling_update =false;

protected Client client;

protected int tp_bind_port;
Expand Down Expand Up @@ -168,7 +180,7 @@ public void init() throws Exception {
}

public void findMembers(List<Address> members, boolean initial_discovery, Responses responses) {
List<String> hosts=readAll();
List<Pod> hosts=readAll();
List<PhysicalAddress> cluster_members=new ArrayList<>(hosts != null? hosts.size() : 16);
PhysicalAddress physical_addr=null;
PingData data=null;
Expand All @@ -185,10 +197,10 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
if(hosts != null) {
if(log.isTraceEnabled())
log.trace("%s: hosts fetched from Kubernetes: %s", local_addr, hosts);
for(String host: hosts) {
for(Pod host: hosts) {
for(int i=0; i <= port_range; i++) {
try {
IpAddress addr=new IpAddress(host, tp_bind_port + i);
IpAddress addr=new IpAddress(host.getIp(), tp_bind_port + i);
if(!cluster_members.contains(addr))
cluster_members.add(addr);
}
Expand All @@ -206,6 +218,33 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
list.stream().filter(phys_addr -> !cluster_members.contains(phys_addr)).forEach(cluster_members::add);
}

if (split_clusters_during_rolling_update) {
if(physical_addr != null) {
String senderIp = ((IpAddress)physical_addr).getIpAddress().getHostAddress();
Copy link
Member Author

@slaskawi slaskawi Aug 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@belaban Could you please have a look at this? Can I safely cast to IpAddress? I didn't find any problems but maybe there are some corner cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this should be find as I don't have other impls of PhysicalAddress

String senderParentDeployment = hosts.stream()
.filter(pod -> senderIp.contains(pod.getIp()))
.map(pod -> pod.getParentDeployment())
.findFirst().orElse(null);
if(senderParentDeployment != null) {
Set<String> allowedAddresses = hosts.stream()
.filter(pod -> senderParentDeployment.equals(pod.getParentDeployment()))
.map(pod -> pod.getIp())
.collect(Collectors.toSet());
for(Iterator<PhysicalAddress> memberIterator = cluster_members.iterator(); memberIterator.hasNext();) {
IpAddress podAddress = (IpAddress) memberIterator.next();
if(!allowedAddresses.contains(podAddress.getIpAddress().getHostAddress())) {
log.trace("removing pod %s from cluster members list since its parent domain is different than senders (%s). Allowed hosts: %s", podAddress, senderParentDeployment, allowedAddresses);
memberIterator.remove();
}
}
} else {
log.warn("split_clusters_during_rolling_update is set to 'true' but can't obtain local node parent deployment. All nodes will be placed in the same cluster.");
}
} else {
log.warn("split_clusters_during_rolling_update is set to 'true' but can't obtain local node IP address. All nodes will be placed in the same cluster.");
}
}

if(log.isTraceEnabled())
log.trace("%s: sending discovery requests to %s", local_addr, cluster_members);
PingHeader hdr=new PingHeader(PingHeader.GET_MBRS_REQ).clusterName(cluster_name).initialDiscovery(initial_discovery);
Expand All @@ -229,12 +268,12 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon

@ManagedOperation(description="Asks Kubernetes for the IP addresses of all pods")
public String fetchFromKube() {
List<String> list=readAll();
return list.stream().collect(Collectors.joining(", "));
List<Pod> list=readAll();
return list.toString();
}


protected List<String> readAll() {
protected List<Pod> readAll() {
if(isClusteringEnabled() && client != null) {
try {
return client.getPods(namespace, labels, dump_requests);
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/org/jgroups/protocols/kubernetes/Pod.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.jgroups.protocols.kubernetes;

public class Pod {

private final String name;
private final String ip;
private final String parentDeployment;


public Pod(String name, String ip, String parentDeployment) {
this.name = name;
this.ip = ip;
this.parentDeployment = parentDeployment;
}

public String getName() {
return name;
}

public String getIp() {
return ip;
}

public String getParentDeployment() {
return parentDeployment;
}

@Override
public String toString() {
return "Pod{" +
"name='" + name + '\'' +
", ip='" + ip + '\'' +
", parentDeployment='" + parentDeployment + '\'' +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Pod pod = (Pod) o;

if (name != null ? !name.equals(pod.name) : pod.name != null) return false;
if (ip != null ? !ip.equals(pod.ip) : pod.ip != null) return false;
return parentDeployment != null ? parentDeployment.equals(pod.parentDeployment) : pod.parentDeployment == null;
}

@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (ip != null ? ip.hashCode() : 0);
result = 31 * result + (parentDeployment != null ? parentDeployment.hashCode() : 0);
return result;
}
}
13 changes: 7 additions & 6 deletions src/test/java/org/jgroups/ping/kube/test/ClientTest.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@

package org.jgroups.ping.kube.test;

import org.jgroups.protocols.kubernetes.Client;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;

import java.util.List;

import static org.junit.Assert.assertEquals;
import org.jgroups.protocols.kubernetes.Client;
import org.jgroups.protocols.kubernetes.Pod;
import org.junit.Assert;
import org.junit.Test;

/**
* @author <a href="mailto:[email protected]">Ales Justin</a>
Expand All @@ -17,10 +18,10 @@ public class ClientTest {
@Test
public void testPods() throws Exception {
Client client = new TestClient();
List<String> pods = client.getPods(null, null, false);
List<Pod> pods = client.getPods(null, null, false);
Assert.assertNotNull(pods);
assertEquals(2, pods.size());
String pod = pods.get(0);
String pod = pods.get(0).getIp();
Assert.assertNotNull(pod);
}

Expand Down
Loading