Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 |
| kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 |
| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 |
| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 |
| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{SPARK_DRIVER_POD_IP}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 |
| kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled | false | If enabled, Kyuubi server will try to create the `spark.kubernetes.file.upload.path` with permission 777 before submitting the Spark application. | boolean | 1.11.0 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1398,8 +1398,9 @@ object KyuubiConf {
buildConf("kyuubi.kubernetes.spark.appUrlPattern")
.doc("The pattern to generate the spark on kubernetes application UI URL. " +
"The pattern should contain placeholders for the application variables. " +
"Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, " +
"`{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`.")
"Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`," +
" `{{SPARK_DRIVER_POD_IP}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}`" +
" and `{{SPARK_UI_PORT}}`.")
.version("1.10.0")
.stringConf
.createWithDefault(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.engine.KubernetesApplicationUrlSource._
import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
Expand Down Expand Up @@ -65,6 +66,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private def appStateContainer: String =
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINER)

private lazy val sparkAppUrlPattern = kyuubiConf.get(KyuubiConf.KUBERNETES_SPARK_APP_URL_PATTERN)
private lazy val sparkAppUrlSource = if (sparkAppUrlPattern.contains(SPARK_DRIVER_SVC_PATTERN)) {
KubernetesApplicationUrlSource.SVC
} else {
KubernetesApplicationUrlSource.POD
}

// key is kyuubi_unique_key
private val appInfoStore: ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)] =
new ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)]
Expand Down Expand Up @@ -143,11 +151,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
val enginePodInformer = client.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY)
.inform(new SparkEnginePodEventHandler(kubernetesInfo))
info(s"[$kubernetesInfo] Start Kubernetes Client Informer.")
val engineSvcInformer = client.services()
.inform(new SparkEngineSvcEventHandler(kubernetesInfo))
info(s"[$kubernetesInfo] Start Kubernetes Client POD Informer.")
enginePodInformers.put(kubernetesInfo, enginePodInformer)
engineSvcInformers.put(kubernetesInfo, engineSvcInformer)
if (sparkAppUrlSource == KubernetesApplicationUrlSource.SVC) {
info(s"[$kubernetesInfo] Start Kubernetes Client SVC Informer.")
val engineSvcInformer = client.services()
.inform(new SparkEngineSvcEventHandler(kubernetesInfo))
engineSvcInformers.put(kubernetesInfo, engineSvcInformer)
}
client

case None => throw new KyuubiException(s"Fail to build Kubernetes client for $kubernetesInfo")
Expand Down Expand Up @@ -474,6 +485,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
id = getPodAppId(pod),
name = getPodAppName(pod),
state = appState,
url = appInfo.url.orElse {
getPodAppUrl(sparkAppUrlSource, sparkAppUrlPattern, kubernetesInfo, pod)
},
error = appError,
podName = Some(pod.getMetadata.getName)))
}.getOrElse {
Expand All @@ -483,6 +497,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
id = getPodAppId(pod),
name = getPodAppName(pod),
state = appState,
url = getPodAppUrl(sparkAppUrlSource, sparkAppUrlPattern, kubernetesInfo, pod),
error = appError,
podName = Some(pod.getMetadata.getName)))
}
Expand All @@ -500,7 +515,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
val appUrl = buildSparkAppUrl(
appUrlPattern,
sparkAppId,
sparkDriverSvc,
sparkDriverSvc = Some(sparkDriverSvc),
sparkDriverPodIp = None,
kubernetesContext,
kubernetesNamespace,
sparkUiPort)
Expand Down Expand Up @@ -695,32 +711,79 @@ object KubernetesApplicationOperation extends Logging {
UNKNOWN
}

final private val SPARK_APP_ID_PATTERN = "{{SPARK_APP_ID}}"
final private val SPARK_DRIVER_SVC_PATTERN = "{{SPARK_DRIVER_SVC}}"
final private val SPARK_DRIVER_POD_IP_PATTERN = "{{SPARK_DRIVER_POD_IP}}"
final private val KUBERNETES_CONTEXT_PATTERN = "{{KUBERNETES_CONTEXT}}"
final private val KUBERNETES_NAMESPACE_PATTERN = "{{KUBERNETES_NAMESPACE}}"
final private val SPARK_UI_PORT_PATTERN = "{{SPARK_UI_PORT}}"

/**
* Replaces all the {{SPARK_APP_ID}} occurrences with the Spark App Id,
* {{SPARK_DRIVER_SVC}} occurrences with the Spark Driver Service name,
* {{SPARK_DRIVER_POD_IP}} occurrences with the Spark Driver Pod IP,
* {{KUBERNETES_CONTEXT}} occurrences with the Kubernetes Context,
* {{KUBERNETES_NAMESPACE}} occurrences with the Kubernetes Namespace,
* and {{SPARK_UI_PORT}} occurrences with the Spark UI Port.
*/
private[kyuubi] def buildSparkAppUrl(
sparkAppUrlPattern: String,
sparkAppId: String,
sparkDriverSvc: String,
sparkDriverSvc: Option[String],
sparkDriverPodIp: Option[String],
kubernetesContext: String,
kubernetesNamespace: String,
sparkUiPort: Int): String = {
sparkAppUrlPattern
.replace("{{SPARK_APP_ID}}", sparkAppId)
.replace("{{SPARK_DRIVER_SVC}}", sparkDriverSvc)
.replace("{{KUBERNETES_CONTEXT}}", kubernetesContext)
.replace("{{KUBERNETES_NAMESPACE}}", kubernetesNamespace)
.replace("{{SPARK_UI_PORT}}", sparkUiPort.toString)
var appUrl = sparkAppUrlPattern
.replace(SPARK_APP_ID_PATTERN, sparkAppId)
.replace(KUBERNETES_CONTEXT_PATTERN, kubernetesContext)
.replace(KUBERNETES_NAMESPACE_PATTERN, kubernetesNamespace)
.replace(SPARK_UI_PORT_PATTERN, sparkUiPort.toString)
sparkDriverSvc match {
case Some(svc) => appUrl = appUrl.replace(SPARK_DRIVER_SVC_PATTERN, svc)
case None =>
}
sparkDriverPodIp match {
case Some(ip) => appUrl = appUrl.replace(SPARK_DRIVER_POD_IP_PATTERN, ip)
case None =>
}
appUrl
}

def getPodAppId(pod: Pod): String = {
pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)
}

private[kyuubi] def getPodAppUrl(
sparkAppUrlSource: KubernetesApplicationUrlSource,
sparkAppUrlPattern: String,
kubernetesInfo: KubernetesInfo,
pod: Pod): Option[String] = {
sparkAppUrlSource match {
case KubernetesApplicationUrlSource.SVC => None
case KubernetesApplicationUrlSource.POD =>
val podIp = Option(pod.getStatus.getPodIP).filter(_.nonEmpty)
podIp match {
case Some(ip) => pod.getSpec.getContainers.asScala.flatMap(
_.getPorts.asScala).find(_.getName == SPARK_UI_PORT_NAME)
.map(_.getContainerPort).map { sparkUiPort =>
buildSparkAppUrl(
sparkAppUrlPattern,
getPodAppId(pod),
sparkDriverSvc = None,
sparkDriverPodIp = Some(ip),
kubernetesContext = kubernetesInfo.context.orNull,
kubernetesNamespace = kubernetesInfo.namespace.orNull,
sparkUiPort = sparkUiPort)
}.orElse {
warn(s"Spark UI port not found in pod ${pod.getMetadata.getName}")
None
}
case None => None
}
}
}

def getPodAppName(pod: Pod): String = {
Option(pod.getMetadata.getLabels.get(SPARK_APP_NAME_LABEL)).getOrElse(pod.getMetadata.getName)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine

object KubernetesApplicationUrlSource extends Enumeration {
type KubernetesApplicationUrlSource = Value
val SVC, POD = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite {
val sparkAppUrlPattern3 =
"http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc" +
".{{KUBERNETES_CONTEXT}}.k8s.io:{{SPARK_UI_PORT}}"
val sparkAppUrlPattern4 = "http://{{SPARK_DRIVER_POD_IP}}:{{SPARK_UI_PORT}}"

val sparkAppId = "spark-123"
val sparkDriverSvc = "spark-456-driver-svc"
Expand All @@ -74,27 +75,40 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite {
assert(KubernetesApplicationOperation.buildSparkAppUrl(
sparkAppUrlPattern1,
sparkAppId,
sparkDriverSvc,
Some(sparkDriverSvc),
None,
kubernetesContext,
kubernetesNamespace,
sparkUiPort) === s"http://$sparkAppId.ingress.balabala")

assert(KubernetesApplicationOperation.buildSparkAppUrl(
sparkAppUrlPattern2,
sparkAppId,
sparkDriverSvc,
Some(sparkDriverSvc),
None,
kubernetesContext,
kubernetesNamespace,
sparkUiPort) === s"http://$sparkDriverSvc.$kubernetesNamespace.svc:$sparkUiPort")

assert(KubernetesApplicationOperation.buildSparkAppUrl(
sparkAppUrlPattern3,
sparkAppId,
sparkDriverSvc,
Some(sparkDriverSvc),
None,
kubernetesContext,
kubernetesNamespace,
sparkUiPort) ===
s"http://$sparkDriverSvc.$kubernetesNamespace.svc.$kubernetesContext.k8s.io:$sparkUiPort")

assert(KubernetesApplicationOperation.buildSparkAppUrl(
sparkAppUrlPattern4,
sparkAppId,
None,
Some("10.69.234.1"),
kubernetesContext,
kubernetesNamespace,
sparkUiPort) ===
s"http://10.69.234.1:$sparkUiPort")
}

test("get kubernetes client initialization info") {
Expand Down
Loading