Skip to content

Commit f8094fb

Browse files
ericlMykhailo Shtelma
authored andcommitted
[SPARK-23809][SQL] Active SparkSession should be set by getOrCreate
## What changes were proposed in this pull request? Currently, the active spark session is set inconsistently (e.g., in createDataFrame, prior to query execution). Many places in spark also incorrectly query active session when they should be calling activeSession.getOrElse(defaultSession) and so might get None even if a Spark session exists. The semantics here can be cleaned up if we also set the active session when the default session is set. Related: https://github.com/apache/spark/pull/20926/files ## How was this patch tested? Unit test, existing test. Note that if apache#20926 merges first we should also update the tests there. Author: Eric Liang <[email protected]> Closes apache#20927 from ericl/active-session-cleanup.
1 parent 02bc4f6 commit f8094fb

File tree

4 files changed

+35
-1
lines changed

4 files changed

+35
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,7 +951,8 @@ object SparkSession {
951951

952952
session = new SparkSession(sparkContext, None, None, extensions)
953953
options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
954-
defaultSession.set(session)
954+
setDefaultSession(session)
955+
setActiveSession(session)
955956

956957
// Register a successfully instantiated context to the singleton. This should be at the
957958
// end of the class definition so that the singleton is updated only if there is no
@@ -1027,6 +1028,17 @@ object SparkSession {
10271028
*/
10281029
def getDefaultSession: Option[SparkSession] = Option(defaultSession.get)
10291030

1031+
/**
1032+
* Returns the currently active SparkSession, otherwise the default one. If there is no default
1033+
* SparkSession, throws an exception.
1034+
*
1035+
* @since 2.4.0
1036+
*/
1037+
def active: SparkSession = {
1038+
getActiveSession.getOrElse(getDefaultSession.getOrElse(
1039+
throw new IllegalStateException("No active or default Spark session found")))
1040+
}
1041+
10301042
////////////////////////////////////////////////////////////////////////////////////////
10311043
// Private methods from now on
10321044
////////////////////////////////////////////////////////////////////////////////////////

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,24 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
5050
assert(SparkSession.builder().getOrCreate() == session)
5151
}
5252

53+
test("sets default and active session") {
54+
assert(SparkSession.getDefaultSession == None)
55+
assert(SparkSession.getActiveSession == None)
56+
val session = SparkSession.builder().master("local").getOrCreate()
57+
assert(SparkSession.getDefaultSession == Some(session))
58+
assert(SparkSession.getActiveSession == Some(session))
59+
}
60+
61+
test("get active or default session") {
62+
val session = SparkSession.builder().master("local").getOrCreate()
63+
assert(SparkSession.active == session)
64+
SparkSession.clearActiveSession()
65+
assert(SparkSession.active == session)
66+
SparkSession.clearDefaultSession()
67+
intercept[IllegalStateException](SparkSession.active)
68+
session.stop()
69+
}
70+
5371
test("config options are propagated to existing SparkSession") {
5472
val session1 = SparkSession.builder().master("local").config("spark-config1", "a").getOrCreate()
5573
assert(session1.conf.get("spark-config1") == "a")

sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ private[spark] class TestSparkSession(sc: SparkContext) extends SparkSession(sc)
3535
}
3636

3737
SparkSession.setDefaultSession(this)
38+
SparkSession.setActiveSession(this)
3839

3940
@transient
4041
override lazy val sessionState: SessionState = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ private[hive] class TestHiveSparkSession(
179179
loadTestTables)
180180
}
181181

182+
SparkSession.setDefaultSession(this)
183+
SparkSession.setActiveSession(this)
184+
182185
{ // set the metastore temporary configuration
183186
val metastoreTempConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map(
184187
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",

0 commit comments

Comments
 (0)