Skip to content
Closed
Prev Previous commit
Next Next commit
Simplify implementation and add more comments
  • Loading branch information
Changgyoo Park committed Jun 19, 2024
commit 5da47114fc9b76c6ccc78c8a3f6ca875153c5f09
Original file line number Diff line number Diff line change
Expand Up @@ -832,16 +832,12 @@ object SparkSession extends Logging {
* they are not set yet or the associated [[SparkConnectClient]] is unusable.
*/
private def setDefaultAndActiveSession(session: SparkSession): Unit = {
var currentDefault = defaultSession.getAcquire
var swapped = false
while ((currentDefault == null || !currentDefault.client.isSessionValid) && !swapped) {
// Update `defaultSession` if it is null or the contained session is not valid.
val result = defaultSession.compareAndExchangeRelease(currentDefault, session)
if (result == currentDefault) {
swapped = true
} else {
currentDefault = result
}
val currentDefault = defaultSession.getAcquire
if (currentDefault == null || !currentDefault.client.isSessionValid) {
// Update `defaultSession` if it is null or the contained session is not valid. There is a
// chance that the following `compareAndSet` fails if a new default session has just been set,
// but that does not matter since that event has happened after this method was invoked.
defaultSession.compareAndSet(currentDefault, session)
}
if (getActiveSession.isEmpty) {
setActiveSession(session)
Expand Down Expand Up @@ -1036,7 +1032,9 @@ object SparkSession extends Logging {
val session = tryCreateSessionFromClient()
.getOrElse({
var existingSession = sessions.get(builder.configuration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a lock here for sessions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so as Cache is backed by ConcurrentMap which allows concurrent access to the data. Source: https://guava.dev/releases/17.0/api/docs/com/google/common/cache/CacheBuilder.html.

while (!existingSession.client.isSessionValid) {
if (!existingSession.client.isSessionValid) {
// If the cached session has become invalid, e.g., due to a server restart, this creates
// a new one and returns it.
sessions.refresh(builder.configuration)
existingSession = sessions.get(builder.configuration)
}
Expand All @@ -1049,7 +1047,8 @@ object SparkSession extends Logging {
}

/**
* Returns the default SparkSession.
* Returns the default SparkSession. If the previously set default SparkSession becomes
* unusable, returns None.
*
* @since 3.5.0
*/
Expand All @@ -1075,7 +1074,8 @@ object SparkSession extends Logging {
}

/**
* Returns the active SparkSession for the current thread.
* Returns the active SparkSession for the current thread. If the previously set active
* SparkSession becomes unusable, returns None.
*
* @since 3.5.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {

test("SPARK-47986: get or create after session changed") {
val remote = s"sc://localhost:$serverPort"

SparkSession.clearDefaultSession()
SparkSession.clearActiveSession()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ private[sql] class SparkConnectClient(
}

/**
* Returns true if the session is valid on both the client and the server.
* Returns true if the session is valid on both the client and the server. A session becomes
* invalid if the server side information about the client, e.g., session ID, does not
* correspond to the actual client state.
*/
private[sql] def isSessionValid: Boolean = {
// The last known state of the session is store in `responseValidator`, because it is where the
Expand Down