Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,25 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
assert(reattachableIter.resultComplete)
}

test("SPARK-48056: Client execute gets INVALID_HANDLE.SESSION_NOT_FOUND and proceeds") {
startDummyServer(0)
client = SparkConnectClient
.builder()
.connectionString(s"sc://localhost:${server.getPort}")
.enableReattachableExecute()
.build()
service.errorToThrowOnExecute = Some(
new StatusRuntimeException(
Status.INTERNAL.withDescription("INVALID_HANDLE.SESSION_NOT_FOUND")))

val plan = buildPlan("select * from range(1)")
val iter = client.execute(plan)
val reattachableIter =
ExecutePlanResponseReattachableIterator.fromIterator(iter)
reattachableIter.foreach(_ => ())
assert(reattachableIter.resultComplete)
}

test("GRPC stub unary call throws error immediately") {
// Spark Connect error retry handling depends on the error being returned from the unary
// call immediately.
Expand Down Expand Up @@ -609,6 +628,8 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
private val inputArtifactRequests: mutable.ListBuffer[AddArtifactsRequest] =
mutable.ListBuffer.empty

var errorToThrowOnExecute: Option[Throwable] = None

private[sql] def getAndClearLatestInputPlan(): proto.Plan = {
val plan = inputPlan
inputPlan = null
Expand All @@ -624,6 +645,13 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer
override def executePlan(
request: ExecutePlanRequest,
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
if (errorToThrowOnExecute.isDefined) {
val error = errorToThrowOnExecute.get
errorToThrowOnExecute = None
responseObserver.onError(error)
return
}

// Reply with a dummy response using the same client ID
val requestSessionId = request.getSessionId
val operationId = if (request.hasOperationId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import org.apache.spark.sql.connect.client.GrpcRetryHandler.RetryException
* ReattachExecute request. ReattachExecute request is provided the responseId of last returned
* ExecutePlanResponse on the iterator to return a new iterator from server that continues after
* that. If the initial ExecutePlan did not even reach the server, and hence reattach fails with
* INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
* INVALID_HANDLE.OPERATION_NOT_FOUND or INVALID_HANDLE.SESSION_NOT_FOUND, we attempt to retry
* ExecutePlan.
*
* In reattachable execute the server does buffer some responses in case the client needs to
* backtrack. To let server release this buffer sooner, this iterator asynchronously sends
Expand All @@ -66,7 +67,8 @@ class ExecutePlanResponseReattachableIterator(
// Add operation id, if not present.
// with operationId set by the client, the client can use it to try to reattach on error
// even before getting the first response. If the operation in fact didn't even reach the
// server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND error.
// server, that will end with INVALID_HANDLE.OPERATION_NOT_FOUND or
// INVALID_HANDLE.SESSION_NOT_FOUND error.
UUID.randomUUID.toString
}

Expand Down Expand Up @@ -234,10 +236,14 @@ class ExecutePlanResponseReattachableIterator(
} catch {
case ex: StatusRuntimeException
if Option(StatusProto.fromThrowable(ex))
.exists(_.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND")) =>
.exists(ex => {
ex.getMessage.contains("INVALID_HANDLE.OPERATION_NOT_FOUND") ||
ex.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND")
}) =>
if (lastReturnedResponseId.isDefined) {
throw new IllegalStateException(
"OPERATION_NOT_FOUND on the server but responses were already received from it.",
"OPERATION_NOT_FOUND/SESSION_NOT_FOUND on the server but responses were already " +
"received from it.",
ex)
}
// Try a new ExecutePlan, and throw upstream for retry.
Expand Down