Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix hang and cancellation
  • Loading branch information
juliuszsompolski committed Jul 12, 2023
commit 20cb3dd1835bcef9f9288fa0dc5f3b9f7a2cafd8
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,31 @@ import org.apache.spark.util.Utils

private[connect] class ExecuteRunner(executeHolder: ExecuteHolder) extends Logging {

// The newly created thread will inherit all InheritableThreadLocals used by Spark,
// e.g. SparkContext.localProperties. If considering implementing a threadpool,
// forwarding of thread locals needs to be taken into account.
private var executionThread: Thread = new ExecutionThread()

private var interrupted: Boolean = false

def start(): Unit = {
// synchronized in case of interrupt while starting.
synchronized {
// The newly created thread will inherit all InheritableThreadLocals used by Spark,
// e.g. SparkContext.localProperties./ If considering implementing a threadpool,
// forwarding of thread locals needs to be taken into account.
this.executionThread.start()
}
this.executionThread.start()
}

private def execute(): Unit = {
// Outer execute
// Outer execute handles errors.
try {
try {
execute()
executeInternal()
} catch {
case NonFatal(e) =>
// Need to catch throwable instead of NonFatal, because e.g. InterruptedException is fatal.
case e: Throwable =>
logDebug(s"Exception in execute: $e")
// Always cancel all remaining execution after error.
executeHolder.sessionHolder.session.sparkContext.cancelJobsWithTag(executeHolder.jobTag)
// Rely on an internal interrupted flag, because Thread.interrupted() could be cleared,
// and different exceptions like InterruptedException, ClosedByInterruptException etc.
// could be thrown.
if (interrupted) {
// Turn the interrupt into OPERATION_CANCELLED error.
throw new SparkSQLException("OPERATION_CANCELLED", Map.empty)
Expand Down