Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fixed the integration tests
  • Loading branch information
liyinan926 committed Oct 19, 2017
commit e9a68129657b1c65b03729c2891dd93d9c8ecc20
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit

import java.util.concurrent.{CountDownLatch, TimeUnit}

import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -109,15 +109,15 @@ private[k8s] class LoggingPodStatusWatcherImpl(
("namespace", pod.getMetadata.getNamespace()),
("labels", pod.getMetadata.getLabels().asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", pod.getMetadata.getCreationTimestamp().getTime),
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),

// spec details
("service account name", pod.getSpec.getServiceAccountName()),
("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName()),

// status
("start time", pod.getStatus.getStartTime.getTime),
("start time", formatTime(pod.getStatus.getStartTime)),
("container images",
pod.getStatus.getContainerStatuses()
.asScala
Expand Down Expand Up @@ -162,7 +162,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
case running: ContainerStateRunning =>
Seq(
("Container state", "Running"),
("Container started at", running.getStartedAt.getTime))
("Container started at", formatTime(running.getStartedAt)))
case waiting: ContainerStateWaiting =>
Seq(
("Container state", "Waiting"),
Expand All @@ -175,4 +175,8 @@ private[k8s] class LoggingPodStatusWatcherImpl(
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
}.getOrElse(Seq(("Container state", "N/A")))
}

private def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
}
}
4 changes: 2 additions & 2 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-linux-amd64</url>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
Expand All @@ -363,7 +363,7 @@
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-darwin-amd64</url>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ private[spark] object Minikube extends Logging {
def getMinikubeStatus: MinikubeStatus.Value = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
val statusString = executeMinikube("status")
.filter(_.contains("minikubeVM: "))
.filter(_.contains("minikube: "))
.head
.replaceFirst("minikubeVM: ", "")
.replaceFirst("minikube: ", "")
MinikubeStatus.unapply(statusString)
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
}
Expand All @@ -78,7 +78,7 @@ private[spark] object Minikube extends Logging {

def deleteMinikube(): Unit = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
if (getMinikubeStatus != MinikubeStatus.DOES_NOT_EXIST) {
if (getMinikubeStatus != MinikubeStatus.NONE) {
executeMinikube("delete")
} else {
logInfo("Minikube was already not running.")
Expand Down Expand Up @@ -115,10 +115,17 @@ private[spark] object Minikube extends Logging {

private[spark] object MinikubeStatus extends Enumeration {

// The following states are listed according to
// https://github.com/docker/machine/blob/master/libmachine/state/state.go.
val STARTING = status("Starting")
val RUNNING = status("Running")
val PAUSED = status("Paused")
val STOPPING = status("Stopping")
val STOPPED = status("Stopped")
val DOES_NOT_EXIST = status("Does Not Exist")
val ERROR = status("Error")
val TIMEOUT = status("Timeout")
val SAVED = status("Saved")
val NONE = status("")

def status(value: String): Value = new Val(nextId, value)
def unapply(s: String): Option[Value] = values.find(s == _.toString)
Expand Down