diff --git a/repl/pom.xml b/repl/pom.xml index 861bbd7c49654..553d5eb79a256 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -167,14 +167,4 @@ - - - scala-2.12 - - scala-2.12/src/main/scala - scala-2.12/src/test/scala - - - - diff --git a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala deleted file mode 100644 index ffb2e5f5db7e2..0000000000000 --- a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.repl - -import java.io.BufferedReader - -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{ILoop, JPrintWriter} -import scala.tools.nsc.util.stringFromStream -import scala.util.Properties.{javaVersion, javaVmName, versionString} - -/** - * A Spark-specific interactive shell. - */ -class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) - extends ILoop(in0, out) { - def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) - def this() = this(None, new JPrintWriter(Console.out, true)) - - val initializationCommands: Seq[String] = Seq( - """ - @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { - org.apache.spark.repl.Main.sparkSession - } else { - org.apache.spark.repl.Main.createSparkSession() - } - @transient val sc = { - val _sc = spark.sparkContext - if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { - val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) - if (proxyUrl != null) { - println( - s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") - } else { - println(s"Spark Context Web UI is available at Spark Master Public URL") - } - } else { - _sc.uiWebUrl.foreach { - webUrl => println(s"Spark context Web UI available at ${webUrl}") - } - } - println("Spark context available as 'sc' " + - s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") - println("Spark session available as 'spark'.") - _sc - } - """, - "import org.apache.spark.SparkContext._", - "import spark.implicits._", - "import spark.sql", - "import org.apache.spark.sql.functions._" - ) - - def initializeSpark() { - intp.beQuietDuring { - savingReplayStack { // remove the commands from session history. - initializationCommands.foreach(command) - } - } - } - - /** Print a welcome message */ - override def printWelcome() { - import org.apache.spark.SPARK_VERSION - echo("""Welcome to - ____ __ - / __/__ ___ _____/ /__ - _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version %s - /_/ - """.format(SPARK_VERSION)) - val welcomeMsg = "Using Scala %s (%s, Java %s)".format( - versionString, javaVmName, javaVersion) - echo(welcomeMsg) - echo("Type in expressions to have them evaluated.") - echo("Type :help for more information.") - } - - /** Available commands */ - override def commands: List[LoopCommand] = standardCommands - - /** - * We override `createInterpreter` because we need to initialize Spark *before* the REPL - * sees any files, so that the Spark context is visible in those files. This is a bit of a - * hack, but there isn't another hook available to us at this point. - */ - override def createInterpreter(): Unit = { - super.createInterpreter() - initializeSpark() - } - - override def resetCommand(line: String): Unit = { - super.resetCommand(line) - initializeSpark() - echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") - } - - override def replay(): Unit = { - initializeSpark() - super.replay() - } - -} - -object SparkILoop { - - /** - * Creates an interpreter loop with default settings and feeds - * the given code to it as input. - */ - def run(code: String, sets: Settings = new Settings): String = { - import java.io.{ BufferedReader, StringReader, OutputStreamWriter } - - stringFromStream { ostream => - Console.withOut(ostream) { - val input = new BufferedReader(new StringReader(code)) - val output = new JPrintWriter(new OutputStreamWriter(ostream), true) - val repl = new SparkILoop(input, output) - - if (sets.classpath.isDefault) { - sets.classpath.value = sys.props("java.class.path") - } - repl process sets - } - } - } - def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString) -} diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala similarity index 82% rename from repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala rename to repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 94265267b1f97..aa9aa2793b8b3 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -24,7 +24,6 @@ import scala.Predef.{println => _, _} // scalastyle:on println import scala.concurrent.Future import scala.reflect.classTag -import scala.reflect.internal.util.ScalaClassLoader.savingContextLoader import scala.reflect.io.File import scala.tools.nsc.{GenericRunnerSettings, Properties} import scala.tools.nsc.Settings @@ -33,7 +32,7 @@ import scala.tools.nsc.interpreter.{AbstractOrMissingHandler, ILoop, IMain, JPri import scala.tools.nsc.interpreter.{NamedParam, SimpleReader, SplashLoop, SplashReader} import scala.tools.nsc.interpreter.StdReplTags.tagOfIMain import scala.tools.nsc.util.stringFromStream -import scala.util.Properties.{javaVersion, javaVmName, versionString} +import scala.util.Properties.{javaVersion, javaVmName, versionNumberString, versionString} /** * A Spark-specific interactive shell. @@ -43,10 +42,32 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) def this() = this(None, new JPrintWriter(Console.out, true)) + /** + * TODO: Remove the following `override` when the support of Scala 2.11 is ended + * Scala 2.11 has a bug of finding imported types in class constructors, extends clause + * which is fixed in Scala 2.12 but never be back-ported into Scala 2.11.x. + * As a result, we copied the fixes into `SparkILoopInterpreter`. See SPARK-22393 for detail. + */ override def createInterpreter(): Unit = { - intp = new SparkILoopInterpreter(settings, out) + if (isScala2_11) { + if (addedClasspath != "") { + settings.classpath append addedClasspath + } + // scalastyle:off classforname + // Have to use the default classloader to match the one used in + // `classOf[Settings]` and `classOf[JPrintWriter]`. + intp = Class.forName("org.apache.spark.repl.SparkILoopInterpreter") + .getDeclaredConstructor(Seq(classOf[Settings], classOf[JPrintWriter]): _*) + .newInstance(Seq(settings, out): _*) + .asInstanceOf[IMain] + // scalastyle:on classforname + } else { + super.createInterpreter() + } } + private val isScala2_11 = versionNumberString.startsWith("2.11") + val initializationCommands: Seq[String] = Seq( """ @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { @@ -124,6 +145,26 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) super.replay() } + /** + * TODO: Remove `runClosure` when the support of Scala 2.11 is ended + */ + private def runClosure(body: => Boolean): Boolean = { + if (isScala2_11) { + // In Scala 2.11, there is a bug that interpret could set the current thread's + // context classloader, but fails to reset it to its previous state when returning + // from that method. This is fixed in SI-8521 https://github.com/scala/scala/pull/5657 + // which is never back-ported into Scala 2.11.x. The following is a workaround fix. + val original = Thread.currentThread().getContextClassLoader + try { + body + } finally { + Thread.currentThread().setContextClassLoader(original) + } + } else { + body + } + } + /** * The following code is mostly a copy of `process` implementation in `ILoop.scala` in Scala * @@ -138,7 +179,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) * We should remove this duplication once Scala provides a way to load our custom initialization * code, and also customize the ordering of printing welcome message. */ - override def process(settings: Settings): Boolean = savingContextLoader { + override def process(settings: Settings): Boolean = runClosure { def newReader = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))