From 2c101932a7b94ff8b4aa14b09bea6728da4a4bdd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 12:35:17 -0700 Subject: [PATCH 1/7] Remove assembly in tests. --- dev/run-tests.py | 15 +------- .../launcher/AbstractCommandBuilder.java | 36 ++++++++++++++++--- python/run-tests.py | 27 +++++++++++--- 3 files changed, 56 insertions(+), 22 deletions(-) diff --git a/dev/run-tests.py b/dev/run-tests.py index a1e6f1bdb560..3ba13d0da23c 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -323,7 +323,7 @@ def get_hadoop_profiles(hadoop_version): def build_spark_maven(hadoop_version): # Enable all of the profiles for the build: build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags - mvn_goals = ["clean", "package", "-DskipTests"] + mvn_goals = ["clean", "package", "-DskipTests", "-pl", "!assembly"] profiles_and_goals = build_profiles + mvn_goals print("[info] Building Spark (w/Hive 1.2.1) using Maven with these arguments: ", @@ -349,16 +349,6 @@ def build_spark_sbt(hadoop_version): exec_sbt(profiles_and_goals) -def build_spark_assembly_sbt(hadoop_version): - # Enable all of the profiles for the build: - build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags - sbt_goals = ["assembly/assembly"] - profiles_and_goals = build_profiles + sbt_goals - print("[info] Building Spark assembly (w/Hive 1.2.1) using SBT with these arguments: ", - " ".join(profiles_and_goals)) - exec_sbt(profiles_and_goals) - - def build_apache_spark(build_tool, hadoop_version): """Will build Spark against Hive v1.2.1 given the passed in build tool (either `sbt` or `maven`). Defaults to using `sbt`.""" @@ -574,9 +564,6 @@ def main(): if build_tool == "sbt": # Note: compatibility tests only supported in sbt for now detect_binary_inop_with_mima() - # Since we did not build assembly/assembly before running dev/mima, we need to - # do it here because the tests still rely on it; see SPARK-13294 for details. - build_spark_assembly_sbt(hadoop_version) # run the test suites run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags) diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index f6c7e07654ee..605fd35d6cb0 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -144,10 +144,38 @@ List buildClassPath(String appClassPath) throws IOException { boolean isTesting = "1".equals(getenv("SPARK_TESTING")); if (prependClasses || isTesting) { String scala = getScalaVersion(); - List projects = Arrays.asList("core", "repl", "mllib", "graphx", - "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver", - "yarn", "launcher", - "common/network-common", "common/network-shuffle", "common/network-yarn"); + // All projects except assemblies: + List projects = Arrays.asList( + "common/network-common", + "common/network-shuffle", + "common/network-yarn", + "common/sketch", + "common/tags", + "common/unsafe", + "core", + "examples", + "external/akka", + "external/docker-integration-tests", + "external/flume", + "external/flume-sink", + "external/kafka", + "external/kinesis-asl", + "external/mqtt", + "external/spark-ganglia-lgpl", + "external/twitter", + "external/zeromq", + "graphx", + "launcher", + "mllib", + "repl", + "sql/catalyst", + "sql/core", + "sql/hive", + "sql/hive-thriftserver", + "streaming", + "tools", + "yarn" + ); if (prependClasses) { if (!isTesting) { System.err.println( diff --git a/python/run-tests.py b/python/run-tests.py index a9f8854e6f66..6f99d8969333 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -54,10 +54,27 @@ def print_red(text): LOGGER = logging.getLogger() -def run_individual_python_test(test_name, pyspark_python): +def get_spark_dist_classpath(): + original_working_dir = os.getcwd() + os.chdir(SPARK_HOME) + cp = subprocess_check_output( + ["./build/sbt", "export assembly/managedClasspath"], universal_newlines=True) + cp = cp.strip().split("\n")[-1] + os.chdir(original_working_dir) + return cp + + +def run_individual_python_test(test_name, pyspark_python, spark_dist_classpath): env = dict(os.environ) - env.update({'SPARK_TESTING': '1', 'PYSPARK_PYTHON': which(pyspark_python), - 'PYSPARK_DRIVER_PYTHON': which(pyspark_python)}) + env.update({ + # Setting SPARK_DIST_CLASSPATH is a simple way to make sure that any child processes + # launched by the tests have access to the correct test-time classpath. + 'SPARK_DIST_CLASSPATH': spark_dist_classpath, + 'SPARK_TESTING': '1', + 'SPARK_PREPEND_CLASSES': '1', + 'PYSPARK_PYTHON': which(pyspark_python), + 'PYSPARK_DRIVER_PYTHON': which(pyspark_python), + }) LOGGER.debug("Starting test(%s): %s", pyspark_python, test_name) start_time = time.time() try: @@ -175,6 +192,8 @@ def main(): priority = 100 task_queue.put((priority, (python_exec, test_goal))) + spark_dist_classpath = get_spark_dist_classpath() + def process_queue(task_queue): while True: try: @@ -182,7 +201,7 @@ def process_queue(task_queue): except Queue.Empty: break try: - run_individual_python_test(test_goal, python_exec) + run_individual_python_test(test_goal, python_exec, spark_dist_classpath) finally: task_queue.task_done() From 267aaf9f1cdb133b54cfb09d411b1477f1d2f878 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 14:07:40 -0700 Subject: [PATCH 2/7] Fix ClassNotFoundException errors in PySpark streaming tests. --- python/pyspark/streaming/tests.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index f4bbb1b12872..5766d1e0882d 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1636,7 +1636,13 @@ def search_kinesis_asl_assembly_jar(): jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar, mqtt_test_jar, kinesis_asl_assembly_jar) - os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars + # We need to set userClassPathFirst here because the streaming data source classes are also + # loadable from the root classloader (because of SPARK_PREPEND_CLASSES) but their dependencies + # are only present in the data source assembly JARs. + os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([ + "--conf spark.driver.userClassPathFirst=true", + "--jars %s pyspark-shell" % jars, + ]) testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests, StreamingListenerTests] From e6e3c206d0ab6979e34194d85784ad6563b3546e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 14:48:04 -0700 Subject: [PATCH 3/7] Revert "Fix ClassNotFoundException errors in PySpark streaming tests." This reverts commit 267aaf9f1cdb133b54cfb09d411b1477f1d2f878. --- python/pyspark/streaming/tests.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 5766d1e0882d..f4bbb1b12872 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1636,13 +1636,7 @@ def search_kinesis_asl_assembly_jar(): jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar, mqtt_test_jar, kinesis_asl_assembly_jar) - # We need to set userClassPathFirst here because the streaming data source classes are also - # loadable from the root classloader (because of SPARK_PREPEND_CLASSES) but their dependencies - # are only present in the data source assembly JARs. - os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([ - "--conf spark.driver.userClassPathFirst=true", - "--jars %s pyspark-shell" % jars, - ]) + os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests, StreamingListenerTests] From 1154eb47be5d7059dfc7309c68545f32e997f6e6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 14:48:34 -0700 Subject: [PATCH 4/7] Remove external/ projects from SPARK_PREPEND_CLASSES --- .../apache/spark/launcher/AbstractCommandBuilder.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 605fd35d6cb0..374bc70dd9db 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -144,7 +144,6 @@ List buildClassPath(String appClassPath) throws IOException { boolean isTesting = "1".equals(getenv("SPARK_TESTING")); if (prependClasses || isTesting) { String scala = getScalaVersion(); - // All projects except assemblies: List projects = Arrays.asList( "common/network-common", "common/network-shuffle", @@ -154,16 +153,6 @@ List buildClassPath(String appClassPath) throws IOException { "common/unsafe", "core", "examples", - "external/akka", - "external/docker-integration-tests", - "external/flume", - "external/flume-sink", - "external/kafka", - "external/kinesis-asl", - "external/mqtt", - "external/spark-ganglia-lgpl", - "external/twitter", - "external/zeromq", "graphx", "launcher", "mllib", From 1fd489ce6fdfa5a134ad1eae94f548f67b988554 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 21:39:23 -0700 Subject: [PATCH 5/7] Enable Hive in PySpark test deps. --- python/run-tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/run-tests.py b/python/run-tests.py index 6f99d8969333..25cc859ed6a9 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -58,7 +58,7 @@ def get_spark_dist_classpath(): original_working_dir = os.getcwd() os.chdir(SPARK_HOME) cp = subprocess_check_output( - ["./build/sbt", "export assembly/managedClasspath"], universal_newlines=True) + ["./build/sbt", "-Phive", "export assembly/managedClasspath"], universal_newlines=True) cp = cp.strip().split("\n")[-1] os.chdir(original_working_dir) return cp From 20d2e2a864ce7b4696a600bd5f68dafe6357c537 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 12:14:20 -0700 Subject: [PATCH 6/7] Tolerate missing SPARK_JARS_DIR in tests. --- bin/spark-class | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/spark-class b/bin/spark-class index e710e388be1b..7a45ffe001d2 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -43,14 +43,14 @@ else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION" fi -if [ ! -d "$SPARK_JARS_DIR" ]; then +if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING" ]; then echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 echo "You need to build Spark before running this program." 1>&2 exit 1 +else + LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" fi -LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" - # Add the launcher build dir to the classpath if requested. if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" From 0900b13055923b653174770fa4e26d61af2c8a53 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 13:30:41 -0700 Subject: [PATCH 7/7] Continue to build assembly in Maven tests (for now) --- dev/run-tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/run-tests.py b/dev/run-tests.py index 3ba13d0da23c..a4a99e7ba3b2 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -323,7 +323,7 @@ def get_hadoop_profiles(hadoop_version): def build_spark_maven(hadoop_version): # Enable all of the profiles for the build: build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags - mvn_goals = ["clean", "package", "-DskipTests", "-pl", "!assembly"] + mvn_goals = ["clean", "package", "-DskipTests"] profiles_and_goals = build_profiles + mvn_goals print("[info] Building Spark (w/Hive 1.2.1) using Maven with these arguments: ",