Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ trait Logging {
if (usingLog4j12) {
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4j12Initialized) {
// scalastyle:off println
if (Utils.isInInterpreter) {
val replDefaultLogProps = "org/apache/spark/log4j-defaults-repl.properties"
Option(Utils.getSparkClassLoader.getResource(replDefaultLogProps)) match {
Expand All @@ -141,6 +142,7 @@ trait Logging {
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
// scalastyle:on println
}
}
Logging.initialized = true
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ private[spark] class RBackend {
private[spark] object RBackend extends Logging {
def main(args: Array[String]): Unit = {
if (args.length < 1) {
// scalastyle:off println
System.err.println("Usage: RBackend <tempFilePath>")
// scalastyle:on println
System.exit(-1)
}
val sparkRBackend = new RBackend()
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
dataOut.write(elem.asInstanceOf[Array[Byte]])
} else if (deserializer == SerializationFormats.STRING) {
// write string(for StringRRDD)
// scalastyle:off println
printOut.println(elem)
// scalastyle:on println
}
}

Expand Down
30 changes: 16 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,26 @@ private class ClientEndpoint(
def pollAndReportStatus(driverId: String) {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
println("... waiting before polling master for driver state")
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
logInfo("... polling master for driver state")
val statusResponse =
activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId))
statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
case true =>
println(s"State of $driverId is ${statusResponse.state.get}")
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
println(s"Driver running on $hostPort ($id)")
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
// Exception, if present
statusResponse.exception.map { e =>
println(s"Exception from cluster was: $e")
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
}
Expand All @@ -148,7 +148,7 @@ private class ClientEndpoint(
override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(master, success, driverId, message) =>
println(message)
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId.get)
Expand All @@ -158,7 +158,7 @@ private class ClientEndpoint(


case KillDriverResponse(master, driverId, success, message) =>
println(message)
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId)
Expand All @@ -169,32 +169,32 @@ private class ClientEndpoint(

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master $remoteAddress.")
logError(s"Error connecting to master $remoteAddress.")
lostMasters += remoteAddress
// Note that this heuristic does not account for the fact that a Master can recover within
// the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
// is not currently a concern, however, because this client does not retry submissions.
if (lostMasters.size >= masterEndpoints.size) {
println("No master is available, exiting.")
logError("No master is available, exiting.")
System.exit(-1)
}
}
}

override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master ($remoteAddress).")
println(s"Cause was: $cause")
logError(s"Error connecting to master ($remoteAddress).")
logError(s"Cause was: $cause")
lostMasters += remoteAddress
if (lostMasters.size >= masterEndpoints.size) {
println("No master is available, exiting.")
logError("No master is available, exiting.")
System.exit(-1)
}
}
}

override def onError(cause: Throwable): Unit = {
println(s"Error processing messages, exiting.")
logError(s"Error processing messages, exiting.")
cause.printStackTrace()
System.exit(-1)
}
Expand All @@ -209,10 +209,12 @@ private class ClientEndpoint(
*/
object Client {
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println

val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ private[deploy] class ClientArguments(args: Array[String]) {
cmd = "launch"

if (!ClientArguments.isValidJarUrl(_jarUrl)) {
// scalastyle:off println
println(s"Jar url '${_jarUrl}' is not in valid format.")
println(s"Must be a jar file path in URL format " +
"(e.g. hdfs://host:port/XX.jar, file:///XX.jar)")
// scalastyle:on println
printUsageAndExit(-1)
}

Expand Down Expand Up @@ -110,7 +112,9 @@ private[deploy] class ClientArguments(args: Array[String]) {
| (default: $DEFAULT_SUPERVISE)
| -v, --verbose Print more debugging output
""".stripMargin
// scalastyle:off println
System.err.println(usage)
// scalastyle:on println
System.exit(exitCode)
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ object RRunner {
}
System.exit(returnCode)
} else {
// scalastyle:off println
System.err.println("SparkR backend did not initialize in " + backendTimeout + " seconds")
// scalastyle:on println
System.exit(-1)
}
}
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ object SparkSubmit {

private val CLASS_NOT_FOUND_EXIT_STATUS = 101

// scalastyle:off println
// Exposed for testing
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
private[spark] var printStream: PrintStream = System.err
Expand All @@ -102,11 +103,14 @@ object SparkSubmit {
printStream.println("Type --help for more information.")
exitFn(0)
}
// scalastyle:on println

def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
Expand Down Expand Up @@ -160,7 +164,9 @@ object SparkSubmit {
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
Expand All @@ -178,7 +184,9 @@ object SparkSubmit {
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
Expand Down Expand Up @@ -558,13 +566,15 @@ object SparkSubmit {
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println

val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
Expand Down Expand Up @@ -592,8 +602,10 @@ object SparkSubmit {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
Expand Down Expand Up @@ -766,7 +778,9 @@ private[spark] object SparkSubmitUtils {
brr.setRoot(repo)
brr.setName(s"repo-${i + 1}")
cr.add(brr)
// scalastyle:off println
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
// scalastyle:on println
}
}

Expand Down Expand Up @@ -829,7 +843,9 @@ private[spark] object SparkSubmitUtils {
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
val dd = new DefaultDependencyDescriptor(ri, false, false)
dd.addDependencyConfiguration(ivyConfName, ivyConfName)
// scalastyle:off println
printStream.println(s"${dd.getDependencyId} added as a dependency")
// scalastyle:on println
md.addDependency(dd)
}
}
Expand Down Expand Up @@ -896,9 +912,11 @@ private[spark] object SparkSubmitUtils {
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}
// scalastyle:off println
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// scalastyle:on println
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
// scalastyle:off println
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
}
}
// scalastyle:on println
defaultProperties
}

Expand Down Expand Up @@ -451,6 +453,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
// scalastyle:off println
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
outStream.println("Unknown/unsupported param " + unknownParam)
Expand Down Expand Up @@ -540,6 +543,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
outStream.println("CLI options:")
outStream.println(getSqlShellOptions())
}
// scalastyle:on println

SparkSubmit.exitFn(exitCode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.deploy.client

private[spark] object TestExecutor {
def main(args: Array[String]) {
// scalastyle:off println
println("Hello world!")
// scalastyle:on println
while (true) {
Thread.sleep(1000)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
Utils.loadDefaultSparkProperties(conf, propertiesFile)

private def printUsageAndExit(exitCode: Int) {
// scalastyle:off println
System.err.println(
"""
|Usage: HistoryServer [options]
Expand Down Expand Up @@ -84,6 +85,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
| spark.history.fs.updateInterval How often to reload log data from storage
| (in seconds, default: 10)
|""".stripMargin)
// scalastyle:on println
System.exit(exitCode)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
* Print usage and exit JVM with the given exit code.
*/
private def printUsageAndExit(exitCode: Int) {
// scalastyle:off println
System.err.println(
"Usage: Master [options]\n" +
"\n" +
Expand All @@ -95,6 +96,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
" --webui-port PORT Port for web UI (default: 8080)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
// scalastyle:on println
System.exit(exitCode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:

case ("--master" | "-m") :: value :: tail =>
if (!value.startsWith("mesos://")) {
// scalastyle:off println
System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
// scalastyle:on println
System.exit(1)
}
masterUrl = value.stripPrefix("mesos://")
Expand All @@ -73,7 +75,9 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:

case Nil => {
if (masterUrl == null) {
// scalastyle:off println
System.err.println("--master is required")
// scalastyle:on println
printUsageAndExit(1)
}
}
Expand All @@ -83,6 +87,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
}

private def printUsageAndExit(exitCode: Int): Unit = {
// scalastyle:off println
System.err.println(
"Usage: MesosClusterDispatcher [options]\n" +
"\n" +
Expand All @@ -96,6 +101,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
" Zookeeper for persistence\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
// scalastyle:on println
System.exit(exitCode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ object DriverWrapper {
rpcEnv.shutdown()

case _ =>
// scalastyle:off println
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
// scalastyle:on println
System.exit(-1)
}
}
Expand Down
Loading