diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index e02cb7a968314..9493c48e0052a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -220,8 +220,20 @@ private[spark] object UI { .stringConf .createOptional + val CUSTOM_DRIVER_LOG_URL = ConfigBuilder("spark.ui.custom.driver.log.url") + .doc("Specifies custom Spark driver log url for supporting external log service instead of " + + "using cluster managers' application log urls in the Spark UI. Spark will support " + + "some path variables via patterns which can vary on cluster manager. Please check the " + + "documentation for your cluster manager to see which patterns are supported, if any. " + + "This configuration replaces original log urls in event log, which will be also effective " + + "when accessing the application on history server. The new log urls must be permanent, " + + "otherwise you might have dead link for executor log urls.") + .version("4.0.0") + .stringConf + .createOptional + val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.ui.custom.executor.log.url") - .doc("Specifies custom spark executor log url for supporting external log service instead of " + + .doc("Specifies custom Spark executor log url for supporting external log service instead of " + "using cluster managers' application log urls in the Spark UI. Spark will support " + "some path variables via patterns which can vary on cluster manager. Please check the " + "documentation for your cluster manager to see which patterns are supported, if any. " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4b3a16b4d3f60..fd73f7352d5ba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -144,6 +144,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ThreadUtils.newDaemonSingleThreadScheduledExecutor("cleanup-decommission-execs") } + override def getDriverLogUrls: Option[Map[String, String]] = { + val logUrlHandler = new ExecutorLogUrlHandler(conf.get(UI.CUSTOM_DRIVER_LOG_URL)) + getDriverAttributes.map(attr => logUrlHandler.applyPattern(Map.empty, attr)).filter(_.nonEmpty) + } + class DriverEndpoint extends IsolatedThreadSafeRpcEndpoint with Logging { override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv diff --git a/docs/configuration.md b/docs/configuration.md index 6833d4e54fd03..abb3f99dcb891 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1654,11 +1654,25 @@ Apart from these, the following properties are also available, and may be useful 2.1.0 + + spark.ui.custom.driver.log.url + (none) + + Specifies custom Spark driver log URL for supporting external log service instead of using cluster + managers' application log URLs in Spark UI. Spark will support some path variables via patterns + which can vary on cluster manager. Please check the documentation for your cluster manager to + see which patterns are supported, if any.

+ Please note that this configuration also replaces original log urls in event log, + which will be also effective when accessing the application on history server. The new log urls must be + permanent, otherwise you might have dead link for driver log urls. + + 4.0.0 + spark.ui.custom.executor.log.url (none) - Specifies custom spark executor log URL for supporting external log service instead of using cluster + Specifies custom Spark executor log URL for supporting external log service instead of using cluster managers' application log URLs in Spark UI. Spark will support some path variables via patterns which can vary on cluster manager. Please check the documentation for your cluster manager to see which patterns are supported, if any.

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 655b30756a298..4c37abff9af3e 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1662,6 +1662,28 @@ See the [configuration page](configuration.html) for information on Spark config 3.3.0 + + spark.ui.custom.driver.log.url + (none) + + Specifies custom Spark driver log URL for supporting external log service instead of using cluster + managers' application log URLs in Spark UI. Spark will support some path variables via patterns + which can vary on cluster manager. Spark Kubernetes cluster manager supports the following path variables: +

+ Please note that this configuration also replaces original log urls in event log, + which will be also effective when accessing the application on history server. The new log urls must be + permanent, otherwise you might have dead link for driver log urls.

+ Example: Config value "https://my.custom.url/logs?app={{APP_ID}}" + adds for application "app-example-123" this link to the Spark UI: + https://my.custom.url/logs?app=app-example-123 + + 4.0.0 + #### Pod template properties diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index ead3188aa6494..b42e22a3e4646 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -74,6 +74,7 @@ object Constants { val ENV_EXECUTOR_POD_NAME = "SPARK_EXECUTOR_POD_NAME" val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" val ENV_CLASSPATH = "SPARK_CLASSPATH" + val ENV_DRIVER_POD_NAME = "SPARK_DRIVER_POD_NAME" val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS" val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR" val ENV_SPARK_USER = "SPARK_USER" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index bd198deed3d5b..202994a9507a0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -129,6 +129,12 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .withNewFieldRef("v1", "status.podIP") .build()) .endEnv() + .addNewEnv() + .withName(ENV_DRIVER_POD_NAME) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "metadata.name") + .build()) + .endEnv() .editOrNewResources() .addToRequests("cpu", driverCpuQuantity) .addToLimits(maybeCpuLimitQuantity.toMap.asJava) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 4e4634504a0f3..f8c3149c8420c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -104,6 +104,13 @@ private[spark] class KubernetesClusterSchedulerBackend( conf.getOption("spark.app.id").getOrElse(appId) } + override def getDriverAttributes: Option[Map[String, String]] = Some(Map( + "LOG_FILES" -> "log", + "APP_ID" -> applicationId(), + "KUBERNETES_NAMESPACE" -> namespace, + "KUBERNETES_POD_NAME" -> Option(System.getenv(ENV_DRIVER_POD_NAME)).getOrElse("[null]") + )) + override def start(): Unit = { super.start() // Must be called before setting the executors diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index bf022ac630158..80df1940b1dbe 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -108,6 +108,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") && envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP"))) + assert(configuredPod.container.getEnv.asScala.exists(envVar => + envVar.getName.equals(ENV_DRIVER_POD_NAME) && + envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") && + envVar.getValueFrom.getFieldRef.getFieldPath.equals("metadata.name"))) + val resourceRequirements = configuredPod.container.getResources val requests = resourceRequirements.getRequests.asScala assert(amountAndFormat(requests("cpu")) === "2") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index b2e4a7182a774..f07efc7e6cd70 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.internal.config.UI import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager} import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl} @@ -47,6 +48,11 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn .set("spark.app.id", TEST_SPARK_APP_ID) .set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL.key, "soLong") .set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE.key, "cruelWorld") + .set( + UI.CUSTOM_DRIVER_LOG_URL.key, + "https://my-custom.url/api/logs?applicationId={{APP_ID}}&namespace={{KUBERNETES_NAMESPACE}}" + + "&pod_name={{KUBERNETES_POD_NAME}}" + ) @Mock private var sc: SparkContext = _ @@ -259,4 +265,52 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn endpoint.receiveAndReply(context).apply(GenerateExecID("cheeseBurger")) verify(context).reply("1") } + + test("Driver attributes") { + assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map( + "LOG_FILES" -> "log", + "APP_ID" -> "spark-app-id", + "KUBERNETES_NAMESPACE" -> "default", + "KUBERNETES_POD_NAME" -> "[null]" + ))) + withEnvs(ENV_DRIVER_POD_NAME -> "pod.name") { + assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map( + "LOG_FILES" -> "log", + "APP_ID" -> "spark-app-id", + "KUBERNETES_NAMESPACE" -> "default", + "KUBERNETES_POD_NAME" -> "pod.name" + ))) + } + } + + test("Driver log urls") { + assert(schedulerBackendUnderTest.getDriverLogUrls === Some(Map( + "log" -> ("https://my-custom.url/api/logs?applicationId=spark-app-id&namespace=default" + + "&pod_name=[null]") + ))) + withEnvs(ENV_DRIVER_POD_NAME -> "pod.name") { + assert(schedulerBackendUnderTest.getDriverLogUrls === Some(Map( + "log" -> ("https://my-custom.url/api/logs?applicationId=spark-app-id&namespace=default" + + "&pod_name=pod.name") + ))) + } + } + + private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = { + val readonlyEnv = System.getenv() + val field = readonlyEnv.getClass.getDeclaredField("m") + field.setAccessible(true) + val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]] + try { + for ((k, v) <- pairs) { + assert(!modifiableEnv.containsKey(k)) + modifiableEnv.put(k, v) + } + f + } finally { + for ((k, _) <- pairs) { + modifiableEnv.remove(k) + } + } + } }