Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions bin/load-spark-env.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ if [%SPARK_ENV_LOADED%] == [] (

rem Setting SPARK_SCALA_VERSION if not already set.

rem set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
rem set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"
set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"

if [%SPARK_SCALA_VERSION%] == [] (

rem if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
rem echo "Presence of build for multiple Scala versions detected."
rem echo "Either clean one of them or, set SPARK_SCALA_VERSION=2.11 in spark-env.cmd."
rem exit 1
rem )
rem if exist %ASSEMBLY_DIR2% (
if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
echo "Presence of build for multiple Scala versions detected."
echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd."
exit 1
)
if exist %ASSEMBLY_DIR2% (
set SPARK_SCALA_VERSION=2.11
rem ) else (
rem set SPARK_SCALA_VERSION=2.12
rem )
) else (
set SPARK_SCALA_VERSION=2.12
)
)
exit /b 0

Expand Down
22 changes: 11 additions & 11 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ fi

if [ -z "$SPARK_SCALA_VERSION" ]; then

#ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
#ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"
ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"

#if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
# echo -e "Presence of build for multiple Scala versions detected." 1>&2
# echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
# exit 1
#fi
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for multiple Scala versions detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION in spark-env.sh.' 1>&2
exit 1
fi

#if [ -d "$ASSEMBLY_DIR2" ]; then
if [ -d "$ASSEMBLY_DIR2" ]; then
export SPARK_SCALA_VERSION="2.11"
#else
# export SPARK_SCALA_VERSION="2.12"
#fi
else
export SPARK_SCALA_VERSION="2.12"
fi
fi
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ trait FutureAction[T] extends Future[T] {
*/
override def value: Option[Try[T]]

// These two methods must be implemented in Scala 2.12, but won't be used by Spark

def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These cause a MiMa warning, because they're new methods in a trait that might be extended by users. I'll have to go back and remember whether this is an actual problem, because the trait is providing a default implementation here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried compiling a small app that calls RDD.countAsync (which returns a FutureAction) and even implements a custom FutureAction and compiled vs 2.2.0, then ran vs this build, and it worked. I believe this may be legitimately excluded from MiMa.

throw new UnsupportedOperationException()

def transformWith[S](f: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] =
throw new UnsupportedOperationException()

/**
* Blocks and returns the result of this job.
*/
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ private[spark] object JavaUtils {
val ui = underlying.iterator
var prev : Option[A] = None

def hasNext: Boolean = ui.hasNext
override def hasNext: Boolean = ui.hasNext

def next(): Entry[A, B] = {
override def next(): Entry[A, B] = {
val (k, v) = ui.next()
prev = Some(k)
new ju.Map.Entry[A, B] {
Expand All @@ -74,7 +74,7 @@ private[spark] object JavaUtils {
}
}

def remove() {
override def remove() {
prev match {
case Some(k) =>
underlying match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.openqa.selenium.WebDriver
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar
import org.scalatest.selenium.WebBrowser

import org.apache.spark._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.mockito.Mockito.{inOrder, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.concurrent.Eventually
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import scala.util.{Failure, Success, Try}

import com.google.common.io.CharStreams
import org.mockito.Mockito._
import org.scalatest.ShouldMatchers
import org.scalatest.mock.MockitoSugar
import org.scalatest.Matchers
import org.scalatest.mockito.MockitoSugar

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
Expand All @@ -38,7 +38,7 @@ import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.storage.{BlockId, ShuffleBlockId}
import org.apache.spark.util.ThreadUtils

class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar with ShouldMatchers {
class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar with Matchers {
test("security default off") {
val conf = new SparkConf()
.set("spark.app.id", "app-id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.network.BlockDataManager
class NettyBlockTransferServiceSuite
extends SparkFunSuite
with BeforeAndAfterEach
with ShouldMatchers {
with Matchers {

private var service0: NettyBlockTransferService = _
private var service1: NettyBlockTransferService = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rpc.netty

import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.network.client.TransportClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.mockito.Mockito.{never, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.internal.config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val rdd = new RDD[String](sc, List()) {
override def getPartitions = Array[Partition](StubPartition(0))
override def compute(split: Partition, context: TaskContext) = {
context.addTaskCompletionListener(context => TaskContextSuite.completed = true)
context.addTaskCompletionListener(new TaskCompletionListener {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes like this resolve an ambiguity where two overloads of a method exist, one with a signature taking a lambda, and the other taking a trait/interface of one method, both of which could fit. In cases like this I resolved in favor of implementing a specific listener class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this a source breaking change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you can avoid source incompatibilities for Scala users by removing the overloads which accept Scala functions and then adding in a package-level implicit conversion to convert from Scala functions back into our own custom trait / interface.

The trickiness here is that you need to preserve binary compatibility on Scala 2.10/2.11, so the removal of the overload needs to be done conditionally so that it only occurs when building with Scala 2.12. Rather than having a separate source tree for 2.12, I'd propose defining the removed overload in a mixin trait which comes from a separate source file and then configure the build to use different versions of that file for 2.10/2.11 and for 2.12.

For details on this proposal, see https://docs.google.com/document/d/1P_wmH3U356f079AYgSsN53HKixuNdxSEvo8nw_tgLgM/edit, a document that I wrote in March 2016 which explores these source incompatibility difficulties.

Applying that idea here, the idea would be to remove the method

def addTaskCompletionListener(f: (TaskContext) => Unit)

and add a package-level implicit conversion from TaskContext => Unit to TaskCompletionListener, but to do this only in the 2.12 source tree / shim. This approach has some caveats and could potentially impact Java users who are doing weird things (violating the goal that Java Spark code is source and binary compatible with all Scala versions). See the linked doc for a full discussion of this problem.

override def onTaskCompletion(context: TaskContext): Unit =
TaskContextSuite.completed = true
})
sys.error("failed")
}
}
Expand Down Expand Up @@ -95,9 +98,13 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("all TaskCompletionListeners should be called even if some fail") {
val context = TaskContext.empty()
val listener = mock(classOf[TaskCompletionListener])
context.addTaskCompletionListener(_ => throw new Exception("blah"))
context.addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = throw new Exception("blah")
})
context.addTaskCompletionListener(listener)
context.addTaskCompletionListener(_ => throw new Exception("blah"))
context.addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = throw new Exception("blah")
})

intercept[TaskCompletionListenerException] {
context.markTaskCompleted(None)
Expand All @@ -109,9 +116,15 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("all TaskFailureListeners should be called even if some fail") {
val context = TaskContext.empty()
val listener = mock(classOf[TaskFailureListener])
context.addTaskFailureListener((_, _) => throw new Exception("exception in listener1"))
context.addTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit =
throw new Exception("exception in listener1")
})
context.addTaskFailureListener(listener)
context.addTaskFailureListener((_, _) => throw new Exception("exception in listener3"))
context.addTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable): Unit =
throw new Exception("exception in listener3")
})

val e = intercept[TaskCompletionListenerException] {
context.markTaskFailed(new Exception("exception in task"))
Expand Down Expand Up @@ -232,7 +245,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
var invocations = 0
val context = TaskContext.empty()
context.markTaskCompleted(None)
context.addTaskCompletionListener(_ => invocations += 1)
context.addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit =
invocations += 1
})
assert(invocations == 1)
context.markTaskCompleted(None)
assert(invocations == 1)
Expand All @@ -244,10 +260,12 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val error = new RuntimeException
val context = TaskContext.empty()
context.markTaskFailed(error)
context.addTaskFailureListener { (_, e) =>
lastError = e
invocations += 1
}
context.addTaskFailureListener(new TaskFailureListener {
override def onTaskFailure(context: TaskContext, e: Throwable): Unit = {
lastError = e
invocations += 1
}
})
assert(lastError == error)
assert(invocations == 1)
context.markTaskFailed(error)
Expand All @@ -267,9 +285,15 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("all TaskCompletionListeners should be called even if some fail or a task") {
val context = TaskContext.empty()
val listener = mock(classOf[TaskCompletionListener])
context.addTaskCompletionListener(_ => throw new Exception("exception in listener1"))
context.addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit =
throw new Exception("exception in listener1")
})
context.addTaskCompletionListener(listener)
context.addTaskCompletionListener(_ => throw new Exception("exception in listener3"))
context.addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit =
throw new Exception("exception in listener3")
})

val e = intercept[TaskCompletionListenerException] {
context.markTaskCompleted(Some(new Exception("exception in task")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.HashMap
import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq}
import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.storage

import org.mockito.Matchers
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark.SparkFunSuite
import org.apache.spark.memory.MemoryMode.ON_HEAP
Expand Down
2 changes: 2 additions & 0 deletions dev/create-release/release-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ if [[ "$1" == "publish-release" ]]; then
# Clean-up Zinc nailgun process
/usr/sbin/lsof -P |grep $ZINC_PORT | grep LISTEN | awk '{ print $2; }' | xargs kill

#./dev/change-scala-version.sh 2.11

pushd $tmp_repo/org/apache/spark

# Remove any extra files generated during install
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ breeze_2.11-0.13.2.jar
calcite-avatica-1.2.0-incubating.jar
calcite-core-1.2.0-incubating.jar
calcite-linq4j-1.2.0-incubating.jar
chill-java-0.8.0.jar
chill_2.11-0.8.0.jar
chill-java-0.8.4.jar
chill_2.11-0.8.4.jar
commons-beanutils-1.7.0.jar
commons-beanutils-core-1.8.0.jar
commons-cli-1.2.jar
Expand Down Expand Up @@ -168,7 +168,7 @@ scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
scala-parser-combinators_2.11-1.0.4.jar
scala-reflect-2.11.8.jar
scala-xml_2.11-1.0.2.jar
scala-xml_2.11-1.0.5.jar
scalap-2.11.8.jar
shapeless_2.11-2.3.2.jar
slf4j-api-1.7.16.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ breeze_2.11-0.13.2.jar
calcite-avatica-1.2.0-incubating.jar
calcite-core-1.2.0-incubating.jar
calcite-linq4j-1.2.0-incubating.jar
chill-java-0.8.0.jar
chill_2.11-0.8.0.jar
chill-java-0.8.4.jar
chill_2.11-0.8.4.jar
commons-beanutils-1.7.0.jar
commons-beanutils-core-1.8.0.jar
commons-cli-1.2.jar
Expand Down Expand Up @@ -169,7 +169,7 @@ scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
scala-parser-combinators_2.11-1.0.4.jar
scala-reflect-2.11.8.jar
scala-xml_2.11-1.0.2.jar
scala-xml_2.11-1.0.5.jar
scalap-2.11.8.jar
shapeless_2.11-2.3.2.jar
slf4j-api-1.7.16.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
import org.scalatest.concurrent.Eventually
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark.streaming.{Duration, TestSuiteBase}
import org.apache.spark.util.ManualClock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.spark.streaming.kinesis

import java.lang.IllegalArgumentException

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, TestSuiteBase}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.mockito.Matchers._
import org.mockito.Matchers.{eq => meq}
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

import org.apache.spark.streaming.{Duration, TestSuiteBase}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,17 @@ String getScalaVersion() {
return scala;
}
String sparkHome = getSparkHome();
//File scala212 = new File(sparkHome, "launcher/target/scala-2.12");
File scala212 = new File(sparkHome, "launcher/target/scala-2.12");
File scala211 = new File(sparkHome, "launcher/target/scala-2.11");
//checkState(!scala210.isDirectory() || !scala211.isDirectory(),
// "Presence of build for multiple Scala versions detected.\n" +
// "Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
//if (scala212.isDirectory()) {
// return "2.12";
//} else {
checkState(scala211.isDirectory(), "Cannot find any build directories.");
return "2.11";
//}
checkState(!scala212.isDirectory() || !scala211.isDirectory(),
"Presence of build for multiple Scala versions detected.\n" +
"Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
if (scala212.isDirectory()) {
return "2.12";
} else {
checkState(scala211.isDirectory(), "Cannot find any build directories.");
return "2.11";
}
}

String getSparkHome() {
Expand Down
Loading