Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package org.apache.spark.deploy.yarn

import java.io.{File, FileOutputStream, OutputStreamWriter}
import java.net.URL
import java.util.Properties
import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.io.Source

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.ByteStreams
Expand Down Expand Up @@ -344,18 +344,20 @@ private object YarnClusterDriver extends Logging with Matchers {
assert(info.logUrlMap.nonEmpty)
}

// If we are running in yarn-cluster mode, verify that driver logs are downloadable.
// If we are running in yarn-cluster mode, verify that driver logs links and present and are
// in the expected format.
if (conf.get("spark.master") == "yarn-cluster") {
assert(listener.driverLogs.nonEmpty)
val driverLogs = listener.driverLogs.get
assert(driverLogs.size === 2)
assert(driverLogs.containsKey("stderr"))
assert(driverLogs.containsKey("stdout"))
val stderr = driverLogs("stderr") // YARN puts everything in stderr.
val lines = Source.fromURL(stderr).getLines()
// Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
// cluster mode.
assert(lines.exists(_.contains("YarnClusterSchedulerBackend")))
val urlStr = driverLogs("stderr")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM; Source isn't used now is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I will clean the import up. Also, I think I am going to remove the check for hostname because that can fail if there are multiple hostnames which may not be the one returned by Utils.getLocalHostName() (just rely on the URL format).

// Ensure that this is a valid URL, else this will throw an exception
new URL(urlStr)
val containerId = YarnSparkHadoopUtil.get.getContainerId
val user = Utils.getCurrentUserName()
assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=0"))
}
}

Expand Down