diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 8bbbcfed626..83485a52175 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -363,7 +363,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 | | kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 | | kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 | -| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 | +| kyuubi.kubernetes.spark.appUrlPattern | http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}} | The pattern to generate the spark on kubernetes application UI URL. The pattern should contain placeholders for the application variables. Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, `{{SPARK_DRIVER_POD_IP}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`. | string | 1.10.0 | | kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled | false | If enabled, Kyuubi server will try to create the `spark.kubernetes.file.upload.path` with permission 777 before submitting the Spark application. | boolean | 1.11.0 | | kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 | | kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index b1589811c92..06c8e7a9d52 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1398,8 +1398,9 @@ object KyuubiConf { buildConf("kyuubi.kubernetes.spark.appUrlPattern") .doc("The pattern to generate the spark on kubernetes application UI URL. " + "The pattern should contain placeholders for the application variables. " + - "Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`, " + - "`{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}` and `{{SPARK_UI_PORT}}`.") + "Available placeholders are `{{SPARK_APP_ID}}`, `{{SPARK_DRIVER_SVC}}`," + + " `{{SPARK_DRIVER_POD_IP}}`, `{{KUBERNETES_NAMESPACE}}`, `{{KUBERNETES_CONTEXT}}`" + + " and `{{SPARK_UI_PORT}}`.") .version("1.10.0") .stringConf .createWithDefault( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 6066c951df9..afe7bcc31e0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -35,6 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE} import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} +import org.apache.kyuubi.engine.KubernetesApplicationUrlSource._ import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataManager @@ -65,6 +66,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private def appStateContainer: String = kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINER) + private lazy val sparkAppUrlPattern = kyuubiConf.get(KyuubiConf.KUBERNETES_SPARK_APP_URL_PATTERN) + private lazy val sparkAppUrlSource = if (sparkAppUrlPattern.contains(SPARK_DRIVER_SVC_PATTERN)) { + KubernetesApplicationUrlSource.SVC + } else { + KubernetesApplicationUrlSource.POD + } + // key is kyuubi_unique_key private val appInfoStore: ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)] = new ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)] @@ -143,11 +151,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { val enginePodInformer = client.pods() .withLabel(LABEL_KYUUBI_UNIQUE_KEY) .inform(new SparkEnginePodEventHandler(kubernetesInfo)) - info(s"[$kubernetesInfo] Start Kubernetes Client Informer.") - val engineSvcInformer = client.services() - .inform(new SparkEngineSvcEventHandler(kubernetesInfo)) + info(s"[$kubernetesInfo] Start Kubernetes Client POD Informer.") enginePodInformers.put(kubernetesInfo, enginePodInformer) - engineSvcInformers.put(kubernetesInfo, engineSvcInformer) + if (sparkAppUrlSource == KubernetesApplicationUrlSource.SVC) { + info(s"[$kubernetesInfo] Start Kubernetes Client SVC Informer.") + val engineSvcInformer = client.services() + .inform(new SparkEngineSvcEventHandler(kubernetesInfo)) + engineSvcInformers.put(kubernetesInfo, engineSvcInformer) + } client case None => throw new KyuubiException(s"Fail to build Kubernetes client for $kubernetesInfo") @@ -474,6 +485,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { id = getPodAppId(pod), name = getPodAppName(pod), state = appState, + url = appInfo.url.orElse { + getPodAppUrl(sparkAppUrlSource, sparkAppUrlPattern, kubernetesInfo, pod) + }, error = appError, podName = Some(pod.getMetadata.getName))) }.getOrElse { @@ -483,6 +497,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { id = getPodAppId(pod), name = getPodAppName(pod), state = appState, + url = getPodAppUrl(sparkAppUrlSource, sparkAppUrlPattern, kubernetesInfo, pod), error = appError, podName = Some(pod.getMetadata.getName))) } @@ -500,7 +515,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { val appUrl = buildSparkAppUrl( appUrlPattern, sparkAppId, - sparkDriverSvc, + sparkDriverSvc = Some(sparkDriverSvc), + sparkDriverPodIp = None, kubernetesContext, kubernetesNamespace, sparkUiPort) @@ -695,9 +711,17 @@ object KubernetesApplicationOperation extends Logging { UNKNOWN } + final private val SPARK_APP_ID_PATTERN = "{{SPARK_APP_ID}}" + final private val SPARK_DRIVER_SVC_PATTERN = "{{SPARK_DRIVER_SVC}}" + final private val SPARK_DRIVER_POD_IP_PATTERN = "{{SPARK_DRIVER_POD_IP}}" + final private val KUBERNETES_CONTEXT_PATTERN = "{{KUBERNETES_CONTEXT}}" + final private val KUBERNETES_NAMESPACE_PATTERN = "{{KUBERNETES_NAMESPACE}}" + final private val SPARK_UI_PORT_PATTERN = "{{SPARK_UI_PORT}}" + /** * Replaces all the {{SPARK_APP_ID}} occurrences with the Spark App Id, * {{SPARK_DRIVER_SVC}} occurrences with the Spark Driver Service name, + * {{SPARK_DRIVER_POD_IP}} occurrences with the Spark Driver Pod IP, * {{KUBERNETES_CONTEXT}} occurrences with the Kubernetes Context, * {{KUBERNETES_NAMESPACE}} occurrences with the Kubernetes Namespace, * and {{SPARK_UI_PORT}} occurrences with the Spark UI Port. @@ -705,22 +729,61 @@ object KubernetesApplicationOperation extends Logging { private[kyuubi] def buildSparkAppUrl( sparkAppUrlPattern: String, sparkAppId: String, - sparkDriverSvc: String, + sparkDriverSvc: Option[String], + sparkDriverPodIp: Option[String], kubernetesContext: String, kubernetesNamespace: String, sparkUiPort: Int): String = { - sparkAppUrlPattern - .replace("{{SPARK_APP_ID}}", sparkAppId) - .replace("{{SPARK_DRIVER_SVC}}", sparkDriverSvc) - .replace("{{KUBERNETES_CONTEXT}}", kubernetesContext) - .replace("{{KUBERNETES_NAMESPACE}}", kubernetesNamespace) - .replace("{{SPARK_UI_PORT}}", sparkUiPort.toString) + var appUrl = sparkAppUrlPattern + .replace(SPARK_APP_ID_PATTERN, sparkAppId) + .replace(KUBERNETES_CONTEXT_PATTERN, kubernetesContext) + .replace(KUBERNETES_NAMESPACE_PATTERN, kubernetesNamespace) + .replace(SPARK_UI_PORT_PATTERN, sparkUiPort.toString) + sparkDriverSvc match { + case Some(svc) => appUrl = appUrl.replace(SPARK_DRIVER_SVC_PATTERN, svc) + case None => + } + sparkDriverPodIp match { + case Some(ip) => appUrl = appUrl.replace(SPARK_DRIVER_POD_IP_PATTERN, ip) + case None => + } + appUrl } def getPodAppId(pod: Pod): String = { pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) } + private[kyuubi] def getPodAppUrl( + sparkAppUrlSource: KubernetesApplicationUrlSource, + sparkAppUrlPattern: String, + kubernetesInfo: KubernetesInfo, + pod: Pod): Option[String] = { + sparkAppUrlSource match { + case KubernetesApplicationUrlSource.SVC => None + case KubernetesApplicationUrlSource.POD => + val podIp = Option(pod.getStatus.getPodIP).filter(_.nonEmpty) + podIp match { + case Some(ip) => pod.getSpec.getContainers.asScala.flatMap( + _.getPorts.asScala).find(_.getName == SPARK_UI_PORT_NAME) + .map(_.getContainerPort).map { sparkUiPort => + buildSparkAppUrl( + sparkAppUrlPattern, + getPodAppId(pod), + sparkDriverSvc = None, + sparkDriverPodIp = Some(ip), + kubernetesContext = kubernetesInfo.context.orNull, + kubernetesNamespace = kubernetesInfo.namespace.orNull, + sparkUiPort = sparkUiPort) + }.orElse { + warn(s"Spark UI port not found in pod ${pod.getMetadata.getName}") + None + } + case None => None + } + } + } + def getPodAppName(pod: Pod): String = { Option(pod.getMetadata.getLabels.get(SPARK_APP_NAME_LABEL)).getOrElse(pod.getMetadata.getName) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationUrlSource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationUrlSource.scala new file mode 100644 index 00000000000..276c50b9593 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationUrlSource.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +object KubernetesApplicationUrlSource extends Enumeration { + type KubernetesApplicationUrlSource = Value + val SVC, POD = Value +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala index de404385240..489a00be407 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala @@ -64,6 +64,7 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { val sparkAppUrlPattern3 = "http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc" + ".{{KUBERNETES_CONTEXT}}.k8s.io:{{SPARK_UI_PORT}}" + val sparkAppUrlPattern4 = "http://{{SPARK_DRIVER_POD_IP}}:{{SPARK_UI_PORT}}" val sparkAppId = "spark-123" val sparkDriverSvc = "spark-456-driver-svc" @@ -74,7 +75,8 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { assert(KubernetesApplicationOperation.buildSparkAppUrl( sparkAppUrlPattern1, sparkAppId, - sparkDriverSvc, + Some(sparkDriverSvc), + None, kubernetesContext, kubernetesNamespace, sparkUiPort) === s"http://$sparkAppId.ingress.balabala") @@ -82,7 +84,8 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { assert(KubernetesApplicationOperation.buildSparkAppUrl( sparkAppUrlPattern2, sparkAppId, - sparkDriverSvc, + Some(sparkDriverSvc), + None, kubernetesContext, kubernetesNamespace, sparkUiPort) === s"http://$sparkDriverSvc.$kubernetesNamespace.svc:$sparkUiPort") @@ -90,11 +93,22 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { assert(KubernetesApplicationOperation.buildSparkAppUrl( sparkAppUrlPattern3, sparkAppId, - sparkDriverSvc, + Some(sparkDriverSvc), + None, kubernetesContext, kubernetesNamespace, sparkUiPort) === s"http://$sparkDriverSvc.$kubernetesNamespace.svc.$kubernetesContext.k8s.io:$sparkUiPort") + + assert(KubernetesApplicationOperation.buildSparkAppUrl( + sparkAppUrlPattern4, + sparkAppId, + None, + Some("10.69.234.1"), + kubernetesContext, + kubernetesNamespace, + sparkUiPort) === + s"http://10.69.234.1:$sparkUiPort") } test("get kubernetes client initialization info") {