Skip to content

Conversation

@ghost
Copy link

@ghost ghost commented Jun 18, 2024

What changes were proposed in this pull request?

This is a Scala port of #46221 and #46435.

A client is unaware of a server restart or the server having closed the client until it receives an error. However, at this point, the client in unable to create a new session to the same connect endpoint, since the stale session is still recorded
as the active and default session.

With this change, when the server communicates that the session has changed via a GRPC error, the session and the respective client are marked as stale, thereby allowing a new default connection can be created via the session builder.

In some cases, particularly when running older versions of the Spark cluster (3.5), the error actually manifests as a mismatch in the observed server-side session id between calls. With this fix, we also capture this case and ensure that this case is
also handled.

Why are the changes needed?

Being unable to use getOrCreate() after an error is unacceptable and should be fixed.

Does this PR introduce any user-facing change?

No

How was this patch tested?

./build/sbt testOnly *SparkSessionE2ESuite

Was this patch authored or co-authored using generative AI tooling?

No

@ghost ghost changed the title [SPARK-47986][CONNECT][SCALA] Unable to create a new session when the default session is closed by the server [SPARK-47986][CONNECT][FOLLOW-UP] Unable to create a new session when the default session is closed by the server Jun 18, 2024
val session = tryCreateSessionFromClient()
.getOrElse(sessions.get(builder.configuration))
.getOrElse({
var existingSession = sessions.get(builder.configuration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a lock here for sessions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so as Cache is backed by ConcurrentMap which allows concurrent access to the data. Source: https://guava.dev/releases/17.0/api/docs/com/google/common/cache/CacheBuilder.html.

*/
def getDefaultSession: Option[SparkSession] = Option(defaultSession.get())
def getDefaultSession: Option[SparkSession] =
Option(defaultSession.get()).filterNot(s => s.client != null && s.client.hasSessionChanged)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can use the same naming with the Python side so we can easily land the same fix at both sides in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll look into the Python code, and try to make it much more harmonised with it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no Scala counterparts for is_stopped or is_closed unfortunately, so I named it isSessionValid instead (not public).

@HyukjinKwon
Copy link
Member

Looks fine but would be great to have looks from @nija-at and @juliuszsompolski

Copy link
Contributor

@nija-at nija-at left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

case e: StatusRuntimeException
if e.getStatus.getCode == Status.Code.INTERNAL &&
e.getMessage.contains("[INVALID_HANDLE.SESSION_CHANGED]") =>
sessionInvalidated.setRelease(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also reset the serverSideSessionId?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so because serverSideSessionId will be updated to the new value if we reset it here. Maybe @nemanja-boric-databricks wants to double check this if I missed something here.

@ghost
Copy link
Author

ghost commented Jun 19, 2024


======================================================================
ERROR [3.579s]: test_crossvalidator_on_pipeline (pyspark.ml.tests.connect.test_connect_tuning.CrossValidatorTestsOnConnect.test_crossvalidator_on_pipeline)
----------------------------------------------------------------------

This failure really doesn't seem to be related to this change..

@HyukjinKwon
Copy link
Member

Merged to master.

@ghost ghost deleted the SPARK-47986 branch June 20, 2024 09:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants