From 8269081e6bf8811e95dd96dd4344589e2c0eadbb Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 30 Jul 2024 17:41:44 +0800 Subject: [PATCH 1/3] Do not block the AQE loop when submitting query stages --- .../spark/sql/internal/StaticSQLConf.scala | 10 ++ .../execution/adaptive/QueryStageExec.scala | 4 +- .../exchange/BroadcastExchangeExec.scala | 20 +++- .../sql/execution/exchange/Exchange.scala | 13 --- .../exchange/ShuffleExchangeExec.scala | 87 +++++++++++---- .../sql/SparkSessionExtensionSuite.scala | 4 +- .../adaptive/AdaptiveQueryExecSuite.scala | 105 ++++++------------ 7 files changed, 128 insertions(+), 115 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index aaeac8ce6fce..fa4423a7df86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -170,6 +170,16 @@ object StaticSQLConf { .intConf .createWithDefault(1000) + val SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD = + buildStaticConf("spark.sql.shuffleExchange.maxThreadThreshold") + .internal() + .doc("The maximum degree of parallelism for doing preparation of shuffle exchange, " + + "which includes subquery execution, file listing, etc.") + .version("4.0.0") + .intConf + .checkValue(thres => thres > 0 && thres <= 1024, "The threshold must be in (0,1024].") + .createWithDefault(1024) + val BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD = buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index bbc36197f92e..51595e20ae5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -194,7 +194,7 @@ case class ShuffleQueryStageExec( def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize - override protected def doMaterialize(): Future[Any] = shuffle.submitShuffleJob + override protected def doMaterialize(): Future[Any] = shuffle.submitShuffleJob() override def newReuseInstance( newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = { @@ -240,7 +240,7 @@ case class BroadcastQueryStageExec( throw SparkException.internalError(s"wrong plan for broadcast stage:\n ${plan.treeString}") } - override protected def doMaterialize(): Future[Any] = broadcast.submitBroadcastJob + override protected def doMaterialize(): Future[Any] = broadcast.submitBroadcastJob() override def newReuseInstance( newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index d6eaded95835..092eabd91e18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -61,23 +61,33 @@ trait BroadcastExchangeLike extends Exchange { */ def relationFuture: Future[broadcast.Broadcast[Any]] + @transient + private lazy val triggerFuture: Future[Any] = { + SQLExecution.withThreadLocalCaptured(session, BroadcastExchangeExec.executionContext) { + // Trigger broadcast preparation which can involve expensive operations like waiting on + // subqueries and file listing. + executeQuery(null) + null + } + } + + protected def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]] + /** * The asynchronous job that materializes the broadcast. It's used for registering callbacks on * `relationFuture`. Note that calling this method may not start the execution of broadcast job. * It also does the preparations work, such as waiting for the subqueries. */ - final def submitBroadcastJob: scala.concurrent.Future[broadcast.Broadcast[Any]] = executeQuery { - materializationStarted.set(true) + final def submitBroadcastJob(): scala.concurrent.Future[broadcast.Broadcast[Any]] = { + triggerFuture completionFuture } - protected def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]] - /** * Cancels broadcast job with an optional reason. */ final def cancelBroadcastJob(reason: Option[String]): Unit = { - if (isMaterializationStarted() && !this.relationFuture.isDone) { + if (!this.relationFuture.isDone) { reason match { case Some(r) => sparkContext.cancelJobsWithTag(this.jobTag, r) case None => sparkContext.cancelJobsWithTag(this.jobTag) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 154070a954f3..c02beea4f879 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.exchange -import java.util.concurrent.atomic.AtomicBoolean - import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -36,17 +34,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe. */ abstract class Exchange extends UnaryExecNode { - /** - * This flag aims to detect if the stage materialization is started. This helps - * to avoid unnecessary AQE stage materialization when the stage is canceled. - */ - protected val materializationStarted = new AtomicBoolean() - - /** - * Exposes status if the materialization is started - */ - def isMaterializationStarted(): Boolean = materializationStarted.get() - override def output: Seq[Attribute] = child.output final override val nodePatterns: Seq[TreePattern] = Seq(EXCHANGE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 44146181438a..eb356a0e341c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.execution.exchange +import java.util.concurrent.atomic.AtomicReference import java.util.function.Supplier import scala.collection.mutable -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future, Promise} import org.apache.spark._ import org.apache.spark.internal.config @@ -37,8 +38,8 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.MutablePair +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.util.{MutablePair, ThreadUtils} import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} import org.apache.spark.util.random.XORShiftRandom @@ -46,16 +47,6 @@ import org.apache.spark.util.random.XORShiftRandom * Common trait for all shuffle exchange implementations to facilitate pattern matching. */ trait ShuffleExchangeLike extends Exchange { - - /** - * The asynchronous job that materializes the shuffle. It also does the preparations work, - * such as waiting for the subqueries. - */ - @transient private lazy val shuffleFuture: Future[MapOutputStatistics] = executeQuery { - materializationStarted.set(true) - mapOutputStatisticsFuture - } - /** * Returns the number of mappers of this shuffle. */ @@ -76,26 +67,70 @@ trait ShuffleExchangeLike extends Exchange { */ def shuffleOrigin: ShuffleOrigin + @transient + private lazy val promise = Promise[MapOutputStatistics]() + + @transient + private lazy val completionFuture + : scala.concurrent.Future[MapOutputStatistics] = promise.future + + @transient + private[sql] // Exposed for testing + val futureAction = new AtomicReference[Option[FutureAction[MapOutputStatistics]]](None) + + @transient + private var isCancelled: Boolean = false + + @transient + private lazy val triggerFuture: java.util.concurrent.Future[Any] = { + SQLExecution.withThreadLocalCaptured(session, ShuffleExchangeExec.executionContext) { + try { + // Trigger shuffle preparation which can involve expensive operations like waiting on + // subqueries and file listing. + executeQuery(null) + // Submit shuffle job if not cancelled. + this.synchronized { + if (isCancelled) { + promise.tryFailure(new SparkException("Shuffle cancelled.")) + } else { + val shuffleJob = mapOutputStatisticsFuture + shuffleJob match { + case action: FutureAction[MapOutputStatistics] => futureAction.set(Some(action)) + case _ => + } + promise.completeWith(shuffleJob) + } + } + null + } catch { + case e: Throwable => + promise.tryFailure(e) + throw e + } + } + } + /** - * Submits the shuffle job. + * The asynchronous job that materializes the shuffle. It also does the preparations work, + * such as waiting for the subqueries. */ - final def submitShuffleJob: Future[MapOutputStatistics] = shuffleFuture - - protected def mapOutputStatisticsFuture: Future[MapOutputStatistics] + final def submitShuffleJob(): Future[MapOutputStatistics] = { + triggerFuture + completionFuture + } /** * Cancels the shuffle job with an optional reason. */ - final def cancelShuffleJob(reason: Option[String]): Unit = { - if (isMaterializationStarted()) { - shuffleFuture match { - case action: FutureAction[MapOutputStatistics] if !action.isCompleted => - action.cancel(reason) - case _ => - } + final def cancelShuffleJob(reason: Option[String]): Unit = this.synchronized { + if (!isCancelled) { + isCancelled = true + futureAction.get().foreach(_.cancel(reason)) } } + protected def mapOutputStatisticsFuture: Future[MapOutputStatistics] + /** * Returns the shuffle RDD with specified partition specs. */ @@ -231,6 +266,10 @@ case class ShuffleExchangeExec( object ShuffleExchangeExec { + private[execution] val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("shuffle-exchange", + SQLConf.get.getConf(StaticSQLConf.SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD))) + /** * Determines whether records must be defensively copied before being sent to the shuffle. * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 6779a9d521c0..322210bf5b59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -1006,7 +1006,7 @@ case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends ShuffleE delegate.shuffleOrigin } override def mapOutputStatisticsFuture: Future[MapOutputStatistics] = - delegate.submitShuffleJob + delegate.submitShuffleJob() override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[_] = delegate.getShuffleRDD(partitionSpecs) override def runtimeStatistics: Statistics = { @@ -1032,7 +1032,7 @@ case class MyBroadcastExchangeExec(delegate: BroadcastExchangeExec) extends Broa override val runId: UUID = delegate.runId override def relationFuture: java.util.concurrent.Future[Broadcast[Any]] = delegate.relationFuture - override def completionFuture: Future[Broadcast[Any]] = delegate.submitBroadcastJob + override def completionFuture: Future[Broadcast[Any]] = delegate.submitBroadcastJob() override def runtimeStatistics: Statistics = delegate.runtimeStatistics override def child: SparkPlan = delegate.child override protected def doPrepare(): Unit = delegate.prepare() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 93df399731d4..fc54e7ecd46d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -910,85 +910,49 @@ class AdaptiveQueryExecSuite } } - test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") { + test("SPARK-47148: AQE should avoid to submit shuffle job on cancellation") { def createJoinedDF(): DataFrame = { - val df = spark.range(5).toDF("col") - val df2 = spark.range(10).toDF("col").coalesce(2) - val df3 = spark.range(15).toDF("col").filter(Symbol("col") >= 2) - df.join(df2, Seq("col")).join(df3, Seq("col")) + // Use subquery expression containing `slow_udf` to delay the submission of shuffle jobs. + val df = sql("SELECT id, (SELECT slow_udf() FROM range(2)) FROM range(5)") + val df2 = sql("SELECT id FROM range(10)").coalesce(2) + val df3 = sql("SELECT id, (SELECT slow_udf() FROM range(2)) FROM range(15) WHERE id > 2") + df.join(df2, Seq("id")).join(df3, Seq("id")) } - try { - spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - val joinedDF = createJoinedDF() - - val error = intercept[SparkException] { - joinedDF.collect() - } - assert(error.getMessage() contains "ProblematicCoalesce execution is failed") - - val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - - // All QueryStages should be based on ShuffleQueryStageExec - val shuffleQueryStageExecs = collect(adaptivePlan) { - case sqse: ShuffleQueryStageExec => sqse - } - assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " + - s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") - shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-"))) - // First ShuffleQueryStage is materialized so it needs to be canceled. - assert(shuffleQueryStageExecs(0).shuffle.isMaterializationStarted(), - "Materialization should be started.") - // Second ShuffleQueryStage materialization is failed so - // it is excluded from the cancellation due to earlyFailedStage. - assert(shuffleQueryStageExecs(1).shuffle.isMaterializationStarted(), - "Materialization should be started but it is failed.") - // Last ShuffleQueryStage is not materialized yet so it does not require - // to be canceled and it is just skipped from the cancellation. - assert(!shuffleQueryStageExecs(2).shuffle.isMaterializationStarted(), - "Materialization should not be started.") - } - } finally { - spark.experimental.extraStrategies = Nil - } - } + withUserDefinedFunction("slow_udf" -> true) { + spark.udf.register("slow_udf", () => { + Thread.sleep(3000) + 1 + }) - test("SPARK-47148: Check if BroadcastQueryStage materialization is started") { - def createJoinedDF(): DataFrame = { - spark.range(10).toDF("col1").createTempView("t1") - spark.range(5).coalesce(2).toDF("col2").createTempView("t2") - spark.range(15).toDF("col3").filter(Symbol("col3") >= 2).createTempView("t3") - sql("SELECT /*+ BROADCAST(t3) */ * FROM (SELECT /*+ BROADCAST(t2) */ * FROM t1 " + - "INNER JOIN t2 ON t1.col1 = t2.col2) t JOIN t3 ON t.col1 = t3.col3;") - } - withTempView("t1", "t2", "t3") { try { spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { - val joinedDF = createJoinedDF() + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val joined = createJoinedDF() + joined.explain(true) val error = intercept[SparkException] { - joinedDF.collect() + joined.collect() } - assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + assert(error.getMessage() contains "coalesce test error") - val adaptivePlan = - joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + val adaptivePlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - // All QueryStages should be based on BroadcastQueryStageExec - val broadcastQueryStageExecs = collect(adaptivePlan) { - case bqse: BroadcastQueryStageExec => bqse - } - assert(broadcastQueryStageExecs.length == 2, adaptivePlan) - broadcastQueryStageExecs.foreach { bqse => - assert(bqse.name.contains("BroadcastQueryStageExec-")) - // Both BroadcastQueryStages are materialized at the beginning. - assert(bqse.broadcast.isMaterializationStarted(), - s"${bqse.name}' s materialization should be started.") + // All QueryStages should be based on ShuffleQueryStageExec + val shuffleQueryStageExecs = collect(adaptivePlan) { + case sqse: ShuffleQueryStageExec => sqse } + assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " + + s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") + // First ShuffleQueryStage is cancelled before shuffle job is submitted. + assert(shuffleQueryStageExecs(0).shuffle.futureAction.get.isEmpty) + // Second ShuffleQueryStage has submitted the shuffle job but it failed. + assert(shuffleQueryStageExecs(1).shuffle.futureAction.get.isDefined, + "Materialization should be started but it is failed.") + // Third ShuffleQueryStage is cancelled before shuffle job is submitted. + assert(shuffleQueryStageExecs(2).shuffle.futureAction.get.isEmpty) } } finally { spark.experimental.extraStrategies = Nil @@ -3057,8 +3021,11 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator { private object TestProblematicCoalesceStrategy extends Strategy { private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { - override protected def doExecute(): RDD[InternalRow] = - throw new SparkException("ProblematicCoalesce execution is failed") + override protected def doExecute(): RDD[InternalRow] = { + child.execute().mapPartitions { _ => + throw new RuntimeException("coalesce test error") + } + } override def output: Seq[Attribute] = child.output override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec = copy(child = newChild) From fbed52528e505780f1686c639ffbcbcb864b6728 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 1 Aug 2024 23:42:47 +0800 Subject: [PATCH 2/3] address comments --- .../exchange/BroadcastExchangeExec.scala | 16 +++++++++++++--- .../execution/exchange/ShuffleExchangeExec.scala | 6 ++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 092eabd91e18..9707966f75d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkException} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.MDC -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.logical.Statistics @@ -61,13 +61,19 @@ trait BroadcastExchangeLike extends Exchange { */ def relationFuture: Future[broadcast.Broadcast[Any]] + @transient + private lazy val promise = Promise[Unit]() + + @transient + private lazy val scalaFuture: scala.concurrent.Future[Unit] = promise.future + @transient private lazy val triggerFuture: Future[Any] = { SQLExecution.withThreadLocalCaptured(session, BroadcastExchangeExec.executionContext) { // Trigger broadcast preparation which can involve expensive operations like waiting on // subqueries and file listing. executeQuery(null) - null + promise.trySuccess(()) } } @@ -80,7 +86,11 @@ trait BroadcastExchangeLike extends Exchange { */ final def submitBroadcastJob(): scala.concurrent.Future[broadcast.Broadcast[Any]] = { triggerFuture - completionFuture + scalaFuture.flatMap { _ => + RDDOperationScope.withScope(sparkContext, nodeName, false, true) { + completionFuture + } + }(BroadcastExchangeExec.executionContext) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index eb356a0e341c..90f00a5035e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -25,7 +25,7 @@ import scala.concurrent.{ExecutionContext, Future, Promise} import org.apache.spark._ import org.apache.spark.internal.config -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -93,7 +93,9 @@ trait ShuffleExchangeLike extends Exchange { if (isCancelled) { promise.tryFailure(new SparkException("Shuffle cancelled.")) } else { - val shuffleJob = mapOutputStatisticsFuture + val shuffleJob = RDDOperationScope.withScope(sparkContext, nodeName, false, true) { + mapOutputStatisticsFuture + } shuffleJob match { case action: FutureAction[MapOutputStatistics] => futureAction.set(Some(action)) case _ => From 3c7fc0a8a7c6c897d37f1789ec6bb68671d2ecd4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 2 Aug 2024 11:01:26 +0800 Subject: [PATCH 3/3] address comment --- .../execution/exchange/BroadcastExchangeExec.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 9707966f75d6..2565a14cef90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -70,10 +70,16 @@ trait BroadcastExchangeLike extends Exchange { @transient private lazy val triggerFuture: Future[Any] = { SQLExecution.withThreadLocalCaptured(session, BroadcastExchangeExec.executionContext) { - // Trigger broadcast preparation which can involve expensive operations like waiting on - // subqueries and file listing. - executeQuery(null) - promise.trySuccess(()) + try { + // Trigger broadcast preparation which can involve expensive operations like waiting on + // subqueries and file listing. + executeQuery(null) + promise.trySuccess(()) + } catch { + case e: Throwable => + promise.tryFailure(e) + throw e + } } }