Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,14 @@ jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-4.0.2.jar
kubernetes-client-3.0.0.jar
kubernetes-model-2.0.0.jar
kubernetes-client-4.1.2.jar
kubernetes-model-4.1.2.jar
kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
logging-interceptor-3.8.1.jar
logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
Expand Down
7 changes: 4 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,14 @@ jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-4.0.2.jar
kubernetes-client-3.0.0.jar
kubernetes-model-2.0.0.jar
kubernetes-client-4.1.2.jar
kubernetes-model-4.1.2.jar
kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
logging-interceptor-3.8.1.jar
logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
Expand Down
7 changes: 4 additions & 3 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ kerby-pkix-1.0.1.jar
kerby-util-1.0.1.jar
kerby-xdr-1.0.1.jar
kryo-shaded-4.0.2.jar
kubernetes-client-3.0.0.jar
kubernetes-model-2.0.0.jar
kubernetes-client-4.1.2.jar
kubernetes-model-4.1.2.jar
kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
logging-interceptor-3.8.1.jar
logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>3.0.0</kubernetes.client.version>
<kubernetes.client.version>4.1.2</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ private[spark] class MountVolumesFeatureStep(
val volumeBuilder = spec.volumeConf match {
case KubernetesHostPathVolumeConf(hostPath) =>
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath))
.withHostPath(new HostPathVolumeSourceBuilder()
.withPath(hostPath)
.build())

case KubernetesPVCVolumeConf(claimName) =>
new VolumeBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

Expand Down Expand Up @@ -174,7 +174,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
}.getOrElse(Seq(("Container state", "N/A")))
}

private def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
private def formatTime(time: String): String = {
if (time != null || time != "") time else "N/A"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part of the codebase has changed on master, but I am not porting back all the changes, just fixing this locally.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object ExecutorLifecycleTestUtils {
def deletedExecutor(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewMetadata()
.withNewDeletionTimestamp("523012521")
.withDeletionTimestamp("523012521")
.endMetadata()
.build()
}
Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<download-maven-plugin.version>1.3.0</download-maven-plugin.version>
<exec-maven-plugin.version>1.4.0</exec-maven-plugin.version>
<extraScalaTestArgs></extraScalaTestArgs>
<kubernetes-client.version>3.0.0</kubernetes-client.version>
<kubernetes-client.version>4.1.2</kubernetes-client.version>
<scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
<scalatest-maven-plugin.version>1.0</scalatest-maven-plugin.version>
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube

import java.io.File
import java.nio.file.Paths

import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
Expand All @@ -26,8 +25,18 @@ import org.apache.spark.internal.Logging

// TODO support windows
private[spark] object Minikube extends Logging {

private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
private val HOST_PREFIX = "host:"
private val KUBELET_PREFIX = "kubelet:"
private val APISERVER_PREFIX = "apiserver:"
private val KUBECTL_PREFIX = "kubectl:"
private val MINIKUBE_VM_PREFIX = "minikubeVM: "
private val MINIKUBE_PREFIX = "minikube: "
private val MINIKUBE_PATH = ".minikube"

def logVersion(): Unit = {
logInfo(executeMinikube("version").mkString("\n"))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this in order to support latest minikube versions.


def getMinikubeIp: String = {
val outputs = executeMinikube("ip")
Expand All @@ -38,12 +47,21 @@ private[spark] object Minikube extends Logging {

def getMinikubeStatus: MinikubeStatus.Value = {
val statusString = executeMinikube("status")
.filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
.head
.replaceFirst("minikubeVM: ", "")
.replaceFirst("minikube: ", "")
MinikubeStatus.unapply(statusString)
logInfo(s"Minikube status command output:\n$statusString")
// up to minikube version v0.30.0 use this to check for minikube status
val oldMinikube = statusString
.filter(line => line.contains(MINIKUBE_VM_PREFIX) || line.contains(MINIKUBE_PREFIX))

if (oldMinikube.isEmpty) {
getIfNewMinikubeStatus(statusString)
} else {
val finalStatusString = oldMinikube
.head
.replaceFirst(MINIKUBE_VM_PREFIX, "")
.replaceFirst(MINIKUBE_PREFIX, "")
MinikubeStatus.unapply(finalStatusString)
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
}
}

def getKubernetesClient: DefaultKubernetesClient = {
Expand All @@ -52,13 +70,46 @@ private[spark] object Minikube extends Logging {
val kubernetesConf = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(kubernetesMaster)
.withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath)
.withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath)
.withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath)
.withCaCertFile(
Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath)
.withClientCertFile(
Paths.get(userHome, MINIKUBE_PATH, "apiserver.crt").toFile.getAbsolutePath)
.withClientKeyFile(
Paths.get(userHome, MINIKUBE_PATH, "apiserver.key").toFile.getAbsolutePath)
.build()
new DefaultKubernetesClient(kubernetesConf)
}

// Covers minikube status output after Minikube V0.30.
private def getIfNewMinikubeStatus(statusString: Seq[String]): MinikubeStatus.Value = {
val hostString = statusString.find(_.contains(s"$HOST_PREFIX "))
val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX "))
val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX "))
val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX "))

if (hostString.isEmpty || kubeletString.isEmpty
|| apiserverString.isEmpty || kubectlString.isEmpty) {
MinikubeStatus.NONE
} else {
val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "")
val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "")
val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "")
val status4 = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "")
if (!status4.contains("Correctly Configured:")) {
MinikubeStatus.NONE
} else {
val stats = List(status1, status2, status3)
.map(MinikubeStatus.unapply)
.map(_.getOrElse(throw new IllegalStateException(s"Unknown status $statusString")))
if (stats.exists(_ != MinikubeStatus.RUNNING)) {
MinikubeStatus.NONE
} else {
MinikubeStatus.RUNNING
}
}
}
}

private def executeMinikube(action: String, args: String*): Seq[String] = {
ProcessUtils.executeProcess(
Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
Expand Down