-
Notifications
You must be signed in to change notification settings - Fork 29k
SPARK-2450 Adds executor log links to Web UI #3486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Adds links to stderr/stdout in the executor tab of the webUI for: 1) Standalone 2) Yarn client 3) Yarn cluster This tries to add the log url support in a general way so as to make it easy to add support for all the cluster managers. This is done by using environment variables to pass to the executor the log urls. The SPARK_LOG_URL_ prefix is used and so additional logs besides stderr/stdout can also be added. To propagate this information to the UI we use the onExecutorAdded spark listener event. Although this commit doesn't add log urls when running on a mesos cluster, it should be possible to add using the same mechanism.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -362,6 +362,7 @@ private[spark] class Worker( | |
| self, | ||
| workerId, | ||
| host, | ||
| webUiPort, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that we can fix this for ephemeral ports by using
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, this fixes it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Argh, I shouldn't have let this get committed without making sure this comment had been addressed. This is still broken; I'm fixing it as part of my own followup PR, which adds more tests.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh no. I thought I addressed all the comments including this one. Do you want me to fix it?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm working on a PR now that will fix this and an issue where this doesn't obey SPARK_PUBLIC_DNS (the latter is a bit trickier to test, unfortunately, since it needs some tricky SparkConf + Mockito usage to mock environment variables). |
||
| sparkHome, | ||
| executorDir, | ||
| akkaUrl, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend( | |
| override def preStart() { | ||
| logInfo("Connecting to driver: " + driverUrl) | ||
| driver = context.actorSelection(driverUrl) | ||
| driver ! RegisterExecutor(executorId, hostPort, cores) | ||
| driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) | ||
| context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) | ||
| } | ||
|
|
||
| def extractLogUrls : Map[String, String] = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no space before |
||
| val prefix = "SPARK_LOG_URL_" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this kind of generality necessary? It would be preferable to avoid this kind of env var name trickery if possible.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, we do this kind of "trickery" in spark already - parse out all env variables that start with SPARK_ so this technique is nothing new. Because of this, I don't see the harm in keeping this flexible so that if a cluster manager wants to add other types of logs it can.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a related note, I added proper command line parsing to CoarseGrainedExecutorBackend over in #3233, which could be a nicer alternative to env variables.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By the way the whole point of declaring this prefix in a separate variable is so that we use the same prefix everywhere. I think it makes sense to put it in the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also I would even rename this |
||
| sys.env.filterKeys(_.startsWith(prefix)) | ||
| .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
| override def receiveWithLogging = { | ||
| case RegisteredExecutor => | ||
| logInfo("Successfully registered with driver") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} | |
| import org.apache.spark.util.Utils | ||
|
|
||
| /** Summary information about an executor to display in the UI. */ | ||
| private case class ExecutorSummaryInfo( | ||
| private[ui] case class ExecutorSummaryInfo( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this visibility change is necessary
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It turns out that this is somehow necessary for the MiMa checks to pass. We should add a comment to explain this. |
||
| id: String, | ||
| hostPort: String, | ||
| rddBlocks: Int, | ||
|
|
@@ -40,7 +40,8 @@ private case class ExecutorSummaryInfo( | |
| totalInputBytes: Long, | ||
| totalShuffleRead: Long, | ||
| totalShuffleWrite: Long, | ||
| maxMemory: Long) | ||
| maxMemory: Long, | ||
| executorLogs : Map[String, String]) | ||
|
|
||
| private[ui] class ExecutorsPage( | ||
| parent: ExecutorsTab, | ||
|
|
@@ -79,6 +80,7 @@ private[ui] class ExecutorsPage( | |
| Shuffle Write | ||
| </span> | ||
| </th> | ||
| <th class="sorttable_nosort">Logs</th> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be conditioned on whether logs actually exist?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, or at least we should say |
||
| {if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty} | ||
| </thead> | ||
| <tbody> | ||
|
|
@@ -138,6 +140,13 @@ private[ui] class ExecutorsPage( | |
| <td sorttable_customkey={info.totalShuffleWrite.toString}> | ||
| {Utils.bytesToString(info.totalShuffleWrite)} | ||
| </td> | ||
| <td> | ||
| { | ||
| info.executorLogs.map(entry => { | ||
| <div><a href={s"${entry._2}"}>{entry._1}</a></div> | ||
| }) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
| </td> | ||
| { | ||
| if (threadDumpEnabled) { | ||
| val encodedId = URLEncoder.encode(info.id, "UTF-8") | ||
|
|
@@ -168,6 +177,7 @@ private[ui] class ExecutorsPage( | |
| val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) | ||
| val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) | ||
| val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) | ||
| val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) | ||
|
|
||
| new ExecutorSummaryInfo( | ||
| execId, | ||
|
|
@@ -183,7 +193,8 @@ private[ui] class ExecutorsPage( | |
| totalInputBytes, | ||
| totalShuffleRead, | ||
| totalShuffleWrite, | ||
| maxMem | ||
| maxMem, | ||
| executorLogs | ||
| ) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -383,7 +383,8 @@ private[spark] object JsonProtocol { | |
|
|
||
| def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { | ||
| ("Host" -> executorInfo.executorHost) ~ | ||
| ("Total Cores" -> executorInfo.totalCores) | ||
| ("Total Cores" -> executorInfo.totalCores) ~ | ||
| ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I agree that we should have backwards-compatibility tests for this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I was mistaken: we don't need backwards-compatibility tests because this ExecutorInfo class is new in 1.3. We should revert my commit to add the compatibility tests, since it's just adds clutter now.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this is correct. The previous commit that was blocking this was my patch to add the ExecutorInfo. I totally blanked too so my mistake. |
||
| } | ||
|
|
||
| /** ------------------------------ * | ||
|
|
@@ -792,7 +793,8 @@ private[spark] object JsonProtocol { | |
| def executorInfoFromJson(json: JValue): ExecutorInfo = { | ||
| val executorHost = (json \ "Host").extract[String] | ||
| val totalCores = (json \ "Total Cores").extract[Int] | ||
| new ExecutorInfo(executorHost, totalCores) | ||
| val logUrls = mapFromJson(json \ "Log Urls").toMap | ||
| new ExecutorInfo(executorHost, totalCores, logUrls) | ||
| } | ||
|
|
||
| /** -------------------------------- * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy | ||
|
|
||
| import org.apache.spark.scheduler.cluster.ExecutorInfo | ||
| import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener} | ||
| import org.apache.spark.{SparkContext, LocalSparkContext} | ||
| import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext | ||
| with BeforeAndAfter with BeforeAndAfterAll { | ||
|
|
||
| /** Length of time to wait while draining listener events. */ | ||
| val WAIT_TIMEOUT_MILLIS = 10000 | ||
|
|
||
| before { | ||
| sc = new SparkContext("local-cluster[2,1,512]", "test") | ||
| } | ||
|
|
||
| test("verify log urls get propagated from workers") { | ||
| val listener = new SaveExecutorInfo | ||
| sc.addSparkListener(listener) | ||
|
|
||
| val rdd1 = sc.parallelize(1 to 100, 4) | ||
| val rdd2 = rdd1.map(_.toString) | ||
| rdd2.setName("Target RDD") | ||
| rdd2.count() | ||
|
|
||
| assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) | ||
| listener.addedExecutorInfos.foreach(e => { | ||
| assert(e._2.logUrlMap.nonEmpty) | ||
| }) | ||
| } | ||
|
|
||
| private class SaveExecutorInfo extends SparkListener { | ||
| val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() | ||
|
|
||
| override def onExecutorAdded(executor : SparkListenerExecutorAdded) { | ||
| addedExecutorInfos(executor.executorId) = executor.executorInfo | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like
$hosthere might be a machine's hostname, but we probably want this to reflect SPARK_PUBLIC_DNS (or whatever the new system property equivalent is) so that we generate an externally-accessible URL.