Skip to content

Commit fdce3e2

Browse files
committed
Fix race issue
1 parent 33b8406 commit fdce3e2

File tree

4 files changed

+41
-1
lines changed

4 files changed

+41
-1
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
193193
return
194194
}
195195

196+
// SPARK-53339: Post the Started event here, right after the CAS succeeds, to ensure that
197+
// postStarted() is never called when interrupt() has already transitioned the state to
198+
// interrupted. This eliminates the race between postStarted() and interrupt().
199+
executeHolder.eventsManager.postStarted()
200+
196201
// `withSession` ensures that session-specific artifacts (such as JARs and class files) are
197202
// available during processing.
198203
executeHolder.sessionHolder.withSession { session =>

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,11 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
248248
* Post @link org.apache.spark.sql.connect.service.SparkListenerConnectOperationCanceled.
249249
*/
250250
def postCanceled(): Unit = {
251+
// SPARK-53339: Pending is included to handle the case where interrupt() is called before
252+
// postStarted() transitions the status from Pending to Started.
251253
assertStatus(
252254
List(
255+
ExecuteStatus.Pending,
253256
ExecuteStatus.Started,
254257
ExecuteStatus.Analyzed,
255258
ExecuteStatus.ReadyForExecution,
@@ -269,8 +272,11 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
269272
* The message of the error thrown during the request.
270273
*/
271274
def postFailed(errorMessage: String): Unit = {
275+
// SPARK-53339: Pending is included to handle the case where postStarted() itself throws
276+
// an exception (e.g., session state check failure) before transitioning from Pending.
272277
assertStatus(
273278
List(
279+
ExecuteStatus.Pending,
274280
ExecuteStatus.Started,
275281
ExecuteStatus.Analyzed,
276282
ExecuteStatus.ReadyForExecution,

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
191191
responseObserver: StreamObserver[proto.ExecutePlanResponse]): ExecuteHolder = {
192192
val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
193193
try {
194-
executeHolder.eventsManager.postStarted()
195194
executeHolder.start()
196195
} catch {
197196
// Errors raised before the execution holder has finished spawning a thread are considered

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,36 @@ class ExecuteEventsManagerSuite
138138
.isInstanceOf[SparkListenerConnectOperationCanceled])
139139
}
140140

141+
test("SPARK-53339: post canceled from Pending state") {
142+
val events = setupEvents(ExecuteStatus.Pending)
143+
events.postCanceled()
144+
assert(events.status == ExecuteStatus.Canceled)
145+
assert(events.terminationReason.contains(TerminationReason.Canceled))
146+
}
147+
148+
test("SPARK-53339: post failed from Pending state") {
149+
val events = setupEvents(ExecuteStatus.Pending)
150+
events.postFailed(DEFAULT_ERROR)
151+
assert(events.status == ExecuteStatus.Failed)
152+
assert(events.terminationReason.contains(TerminationReason.Failed))
153+
}
154+
155+
test("SPARK-53339: Pending to Canceled to Closed transition") {
156+
val events = setupEvents(ExecuteStatus.Pending)
157+
events.postCanceled()
158+
events.postClosed()
159+
assert(events.status == ExecuteStatus.Closed)
160+
assert(events.terminationReason.contains(TerminationReason.Canceled))
161+
}
162+
163+
test("SPARK-53339: Pending to Failed to Closed transition") {
164+
val events = setupEvents(ExecuteStatus.Pending)
165+
events.postFailed(DEFAULT_ERROR)
166+
events.postClosed()
167+
assert(events.status == ExecuteStatus.Closed)
168+
assert(events.terminationReason.contains(TerminationReason.Failed))
169+
}
170+
141171
test("SPARK-43923: post failed") {
142172
val events = setupEvents(ExecuteStatus.Started)
143173
events.postFailed(DEFAULT_ERROR)

0 commit comments

Comments
 (0)