From 756837d3e13e4ae77452cab44e1b7cffd09aefba Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 28 Jul 2020 18:41:03 -0700 Subject: [PATCH 1/6] Add configs to switch allow/disallow to create SparkContext in executors. --- .../scala/org/apache/spark/SparkContext.scala | 6 +++-- .../spark/internal/config/package.scala | 7 +++++ .../org/apache/spark/SparkContextSuite.scala | 9 +++++++ python/pyspark/context.py | 5 ++-- python/pyspark/tests/test_context.py | 11 ++++++++ .../org/apache/spark/sql/SparkSession.scala | 12 ++++++--- .../spark/sql/SparkSessionBuilderSuite.scala | 26 ++++++++++++++++++- 7 files changed, 67 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 06abc0541a9a..9ecf316beeaa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -83,8 +83,10 @@ class SparkContext(config: SparkConf) extends Logging { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() - // In order to prevent SparkContext from being created in executors. - SparkContext.assertOnDriver() + if (!config.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) { + // In order to prevent SparkContext from being created in executors. + SparkContext.assertOnDriver() + } // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e1b598e67049..fdc9253ce9b0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1908,4 +1908,11 @@ package object config { .version("3.1.0") .booleanConf .createWithDefault(false) + + private[spark] val ALLOW_SPARK_CONTEXT_IN_EXECUTORS = + ConfigBuilder("spark.driver.allowSparkContextInExecutors") + .doc("If set to true, SparkContext can be created in executors.") + .version("3.0.1") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 132e994c37a5..1f7aa8eec894 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -946,6 +946,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(error.contains("SparkContext should only be created and accessed on the driver.")) } + + test("SPARK-32160: Allow to create SparkContext in executors if the config is set") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + + sc.range(0, 1).foreach { _ => + new SparkContext(new SparkConf().setAppName("test").setMaster("local") + .set(ALLOW_SPARK_CONTEXT_IN_EXECUTORS, true)).stop() + } + } } object SparkContextSuite { diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 2e105cc38260..801f4b42ba64 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -117,8 +117,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, ... ValueError:... """ - # In order to prevent SparkContext from being created in executors. - SparkContext._assert_on_driver() + if conf is None or not conf.get("spark.python.allowSparkContextInExecutors", "false"): + # In order to prevent SparkContext from being created in executors. + SparkContext._assert_on_driver() self._callsite = first_spark_call() or CallSite(None, None, None) if gateway is not None and gateway.gateway_parameters.auth_token is None: diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index 168299e385e7..73790246e433 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -275,6 +275,17 @@ def test_disallow_to_create_spark_context_in_executors(self): self.assertIn("SparkContext should only be created and accessed on the driver.", str(context.exception)) + def test_allow_to_create_spark_context_in_executors(self): + # SPARK-32160: SparkContext can be created in executors if the config is set. + + def create_spark_context(): + conf = SparkConf().set("spark.python.allowSparkContextInExecutors", "true") + with SparkContext(conf=conf): + pass + + with SparkContext("local-cluster[3, 1, 1024]") as sc: + sc.range(2).foreach(lambda _: create_spark_context()) + class ContextTestsWithResources(unittest.TestCase): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 08b0a1c6a60a..306c3235b0bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -29,6 +29,7 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog @@ -900,7 +901,13 @@ object SparkSession extends Logging { * @since 2.0.0 */ def getOrCreate(): SparkSession = synchronized { - assertOnDriver() + val sparkConf = new SparkConf() + options.foreach { case (k, v) => sparkConf.set(k, v) } + + if (!sparkConf.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) { + assertOnDriver() + } + // Get the session from current thread's active session. var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { @@ -919,9 +926,6 @@ object SparkSession extends Logging { // No active nor global default session. Create a new one. val sparkContext = userSuppliedContext.getOrElse { - val sparkConf = new SparkConf() - options.foreach { case (k, v) => sparkConf.set(k, v) } - // set a random app name if not given. if (!sparkConf.contains("spark.app.name")) { sparkConf.setAppName(java.util.UUID.randomUUID().toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index e914d8398e92..cc261a9ed359 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterEach -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf._ @@ -257,4 +258,27 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { context.stop() } } + + test("SPARK-32160: Disallow to create SparkSession in executors") { + val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate() + + val error = intercept[SparkException] { + session.range(1).foreach { v => + SparkSession.builder.master("local").getOrCreate() + () + } + }.getMessage() + + assert(error.contains("SparkSession should only be created and accessed on the driver.")) + } + + test("SPARK-32160: Allow to create SparkSession in executors if the config is set") { + val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate() + + session.range(1).foreach { v => + SparkSession.builder.master("local") + .config(ALLOW_SPARK_CONTEXT_IN_EXECUTORS.key, true).getOrCreate().stop() + () + } + } } From 77a086a5298984ebf0d362fa66b0b9e500614737 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 29 Jul 2020 15:49:42 -0700 Subject: [PATCH 2/6] Fix. --- python/pyspark/context.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 801f4b42ba64..c5cbc7ed52e8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -117,7 +117,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, ... ValueError:... """ - if conf is None or not conf.get("spark.python.allowSparkContextInExecutors", "false"): + if (conf is None or + conf.get("spark.python.allowSparkContextInExecutors", "false").lower() != "true"): # In order to prevent SparkContext from being created in executors. SparkContext._assert_on_driver() From 1c1b7ec202ee4544f51b846230ec12bef608beb7 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 29 Jul 2020 16:34:23 -0700 Subject: [PATCH 3/6] Add a migration guide. --- docs/core-migration-guide.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 63baef145f01..9ee7ad66f54a 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -22,6 +22,10 @@ license: | * Table of contents {:toc} +## Upgrading from Core 3.0 to 3.1 + +- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. If you need to create `SparkContext` in executors, you can allow it by setting the configuration `spark.driver.allowSparkContextInExecutors` in Scala/Java executors or `spark.python.allowSparkContextInExecutors` in PySpark executors. + ## Upgrading from Core 2.4 to 3.0 - The `org.apache.spark.ExecutorPlugin` interface and related configuration has been replaced with From 9feb7e1cffc51e53f1ed815a769bb46defb769d5 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 29 Jul 2020 22:26:08 -0700 Subject: [PATCH 4/6] Use a single config. --- docs/core-migration-guide.md | 2 +- python/pyspark/context.py | 2 +- python/pyspark/tests/test_context.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 9ee7ad66f54a..b2a08502d0d6 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -24,7 +24,7 @@ license: | ## Upgrading from Core 3.0 to 3.1 -- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. If you need to create `SparkContext` in executors, you can allow it by setting the configuration `spark.driver.allowSparkContextInExecutors` in Scala/Java executors or `spark.python.allowSparkContextInExecutors` in PySpark executors. +- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.driver.allowSparkContextInExecutors` when creating `SparkContext` in executors. ## Upgrading from Core 2.4 to 3.0 diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c5cbc7ed52e8..379f5a4e430c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -118,7 +118,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, ValueError:... """ if (conf is None or - conf.get("spark.python.allowSparkContextInExecutors", "false").lower() != "true"): + conf.get("spark.driver.allowSparkContextInExecutors", "false").lower() != "true"): # In order to prevent SparkContext from being created in executors. SparkContext._assert_on_driver() diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index 73790246e433..64fe3837e769 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -279,7 +279,7 @@ def test_allow_to_create_spark_context_in_executors(self): # SPARK-32160: SparkContext can be created in executors if the config is set. def create_spark_context(): - conf = SparkConf().set("spark.python.allowSparkContextInExecutors", "true") + conf = SparkConf().set("spark.driver.allowSparkContextInExecutors", "true") with SparkContext(conf=conf): pass From ce5860f7c115ecc2cbf214c5a35f0460dfe32999 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 30 Jul 2020 10:35:08 -0700 Subject: [PATCH 5/6] Rerun tests. From 0ea7ea956b3867b67c869fddcc294cb268157a92 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 30 Jul 2020 18:53:01 -0700 Subject: [PATCH 6/6] Rerun tests.