Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ object StaticSQLConf {
.intConf
.createWithDefault(1000)

val SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD =
buildStaticConf("spark.sql.shuffleExchange.maxThreadThreshold")
.internal()
.doc("The maximum degree of parallelism for doing preparation of shuffle exchange, " +
"which includes subquery execution, file listing, etc.")
.version("4.0.0")
.intConf
.checkValue(thres => thres > 0 && thres <= 1024, "The threshold must be in (0,1024].")
.createWithDefault(1024)
Copy link
Member

@yaooqinn yaooqinn Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we pick this number? It might create memory pressure on the driver

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle async job is just waiting for other work (subquery expression execution) to finish, which is very light-weighted. The broadcast async job executes a query and collects the result in the driver, which is very heavy. That's why we can give much larger parallelism to the shuffle async jobs. In our benchmark we found this number is reasonably good for TPC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a correlation with the number of system cores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, the BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD is also way larger than the driver system cores.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this parameter has anything to do with SPARK-49091 or if it was just caused by SPARK-41914 which the reporter pointed to.

Also cc @wangyum

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: SPARK-49091 is not related


val BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD =
buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ case class ShuffleQueryStageExec(

def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize

override protected def doMaterialize(): Future[Any] = shuffle.submitShuffleJob
override protected def doMaterialize(): Future[Any] = shuffle.submitShuffleJob()

override def newReuseInstance(
newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = {
Expand Down Expand Up @@ -240,7 +240,7 @@ case class BroadcastQueryStageExec(
throw SparkException.internalError(s"wrong plan for broadcast stage:\n ${plan.treeString}")
}

override protected def doMaterialize(): Future[Any] = broadcast.submitBroadcastJob
override protected def doMaterialize(): Future[Any] = broadcast.submitBroadcastJob()

override def newReuseInstance(
newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,33 @@ trait BroadcastExchangeLike extends Exchange {
*/
def relationFuture: Future[broadcast.Broadcast[Any]]

@transient
private lazy val triggerFuture: Future[Any] = {
SQLExecution.withThreadLocalCaptured(session, BroadcastExchangeExec.executionContext) {
// Trigger broadcast preparation which can involve expensive operations like waiting on
// subqueries and file listing.
executeQuery(null)
null
}
}

protected def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]]

/**
* The asynchronous job that materializes the broadcast. It's used for registering callbacks on
* `relationFuture`. Note that calling this method may not start the execution of broadcast job.
* It also does the preparations work, such as waiting for the subqueries.
*/
final def submitBroadcastJob: scala.concurrent.Future[broadcast.Broadcast[Any]] = executeQuery {
materializationStarted.set(true)
final def submitBroadcastJob(): scala.concurrent.Future[broadcast.Broadcast[Any]] = {
triggerFuture
completionFuture
}

protected def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]]

/**
* Cancels broadcast job with an optional reason.
*/
final def cancelBroadcastJob(reason: Option[String]): Unit = {
if (isMaterializationStarted() && !this.relationFuture.isDone) {
if (!this.relationFuture.isDone) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not re-implement broadcast cancellation, as we need more refactoring to move the creation of Future to BroadcastExchangeLike

reason match {
case Some(r) => sparkContext.cancelJobsWithTag(this.jobTag, r)
case None => sparkContext.cancelJobsWithTag(this.jobTag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.exchange

import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -36,17 +34,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe.
*/
abstract class Exchange extends UnaryExecNode {
/**
* This flag aims to detect if the stage materialization is started. This helps
* to avoid unnecessary AQE stage materialization when the stage is canceled.
*/
protected val materializationStarted = new AtomicBoolean()

/**
* Exposes status if the materialization is started
*/
def isMaterializationStarted(): Boolean = materializationStarted.get()

override def output: Seq[Attribute] = child.output
final override val nodePatterns: Seq[TreePattern] = Seq(EXCHANGE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.execution.exchange

import java.util.concurrent.atomic.AtomicReference
import java.util.function.Supplier

import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future, Promise}

import org.apache.spark._
import org.apache.spark.internal.config
Expand All @@ -37,25 +38,15 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.MutablePair
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.util.{MutablePair, ThreadUtils}
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
import org.apache.spark.util.random.XORShiftRandom

/**
* Common trait for all shuffle exchange implementations to facilitate pattern matching.
*/
trait ShuffleExchangeLike extends Exchange {

/**
* The asynchronous job that materializes the shuffle. It also does the preparations work,
* such as waiting for the subqueries.
*/
@transient private lazy val shuffleFuture: Future[MapOutputStatistics] = executeQuery {
materializationStarted.set(true)
mapOutputStatisticsFuture
}

/**
* Returns the number of mappers of this shuffle.
*/
Expand All @@ -76,26 +67,70 @@ trait ShuffleExchangeLike extends Exchange {
*/
def shuffleOrigin: ShuffleOrigin

@transient
private lazy val promise = Promise[MapOutputStatistics]()

@transient
private lazy val completionFuture
: scala.concurrent.Future[MapOutputStatistics] = promise.future

@transient
private[sql] // Exposed for testing
val futureAction = new AtomicReference[Option[FutureAction[MapOutputStatistics]]](None)

@transient
private var isCancelled: Boolean = false

@transient
private lazy val triggerFuture: java.util.concurrent.Future[Any] = {
SQLExecution.withThreadLocalCaptured(session, ShuffleExchangeExec.executionContext) {
try {
// Trigger shuffle preparation which can involve expensive operations like waiting on
// subqueries and file listing.
executeQuery(null)
// Submit shuffle job if not cancelled.
this.synchronized {
if (isCancelled) {
promise.tryFailure(new SparkException("Shuffle cancelled."))
} else {
val shuffleJob = mapOutputStatisticsFuture
shuffleJob match {
case action: FutureAction[MapOutputStatistics] => futureAction.set(Some(action))
case _ =>
}
promise.completeWith(shuffleJob)
}
}
null
} catch {
case e: Throwable =>
promise.tryFailure(e)
throw e
}
}
}

/**
* Submits the shuffle job.
* The asynchronous job that materializes the shuffle. It also does the preparations work,
* such as waiting for the subqueries.
*/
final def submitShuffleJob: Future[MapOutputStatistics] = shuffleFuture

protected def mapOutputStatisticsFuture: Future[MapOutputStatistics]
final def submitShuffleJob(): Future[MapOutputStatistics] = {
triggerFuture
completionFuture
}

/**
* Cancels the shuffle job with an optional reason.
*/
final def cancelShuffleJob(reason: Option[String]): Unit = {
if (isMaterializationStarted()) {
shuffleFuture match {
case action: FutureAction[MapOutputStatistics] if !action.isCompleted =>
action.cancel(reason)
case _ =>
}
final def cancelShuffleJob(reason: Option[String]): Unit = this.synchronized {
if (!isCancelled) {
isCancelled = true
futureAction.get().foreach(_.cancel(reason))
}
}

protected def mapOutputStatisticsFuture: Future[MapOutputStatistics]

/**
* Returns the shuffle RDD with specified partition specs.
*/
Expand Down Expand Up @@ -231,6 +266,10 @@ case class ShuffleExchangeExec(

object ShuffleExchangeExec {

private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("shuffle-exchange",
SQLConf.get.getConf(StaticSQLConf.SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD)))

/**
* Determines whether records must be defensively copied before being sent to the shuffle.
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends ShuffleE
delegate.shuffleOrigin
}
override def mapOutputStatisticsFuture: Future[MapOutputStatistics] =
delegate.submitShuffleJob
delegate.submitShuffleJob()
override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[_] =
delegate.getShuffleRDD(partitionSpecs)
override def runtimeStatistics: Statistics = {
Expand All @@ -1032,7 +1032,7 @@ case class MyBroadcastExchangeExec(delegate: BroadcastExchangeExec) extends Broa
override val runId: UUID = delegate.runId
override def relationFuture: java.util.concurrent.Future[Broadcast[Any]] =
delegate.relationFuture
override def completionFuture: Future[Broadcast[Any]] = delegate.submitBroadcastJob
override def completionFuture: Future[Broadcast[Any]] = delegate.submitBroadcastJob()
override def runtimeStatistics: Statistics = delegate.runtimeStatistics
override def child: SparkPlan = delegate.child
override protected def doPrepare(): Unit = delegate.prepare()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,85 +910,49 @@ class AdaptiveQueryExecSuite
}
}

test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") {
test("SPARK-47148: AQE should avoid to submit shuffle job on cancellation") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case seems to be a bit unstable:

[info] - SPARK-47148: AQE should avoid to submit shuffle job on cancellation *** FAILED *** (6 seconds, 94 milliseconds)
[info]   "Multiple failures in stage materialization." did not contain "coalesce test error" (AdaptiveQueryExecSuite.scala:939)
- SPARK-47148: AQE should avoid to submit shuffle job on cancellation *** FAILED ***
  "[SCALAR_SUBQUERY_TOO_MANY_ROWS] More than one row returned by a subquery used as an expression. SQLSTATE: 21000
  == SQL (line 1, position 12) ==
  SELECT id, (SELECT slow_udf() FROM range(2)) FROM range(5)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  " did not contain "coalesce test error" (AdaptiveQueryExecSuite.scala:939)

any good solutions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because the slow_udf is not slow enough and the shuffle stage was submitted too early. Can you try to increase the sleep time in slow_udf and see if it fixes the problem?

def createJoinedDF(): DataFrame = {
val df = spark.range(5).toDF("col")
val df2 = spark.range(10).toDF("col").coalesce(2)
val df3 = spark.range(15).toDF("col").filter(Symbol("col") >= 2)
df.join(df2, Seq("col")).join(df3, Seq("col"))
// Use subquery expression containing `slow_udf` to delay the submission of shuffle jobs.
val df = sql("SELECT id, (SELECT slow_udf() FROM range(2)) FROM range(5)")
val df2 = sql("SELECT id FROM range(10)").coalesce(2)
val df3 = sql("SELECT id, (SELECT slow_udf() FROM range(2)) FROM range(15) WHERE id > 2")
df.join(df2, Seq("id")).join(df3, Seq("id"))
}

try {
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val joinedDF = createJoinedDF()

val error = intercept[SparkException] {
joinedDF.collect()
}
assert(error.getMessage() contains "ProblematicCoalesce execution is failed")

val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]

// All QueryStages should be based on ShuffleQueryStageExec
val shuffleQueryStageExecs = collect(adaptivePlan) {
case sqse: ShuffleQueryStageExec => sqse
}
assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " +
s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-")))
// First ShuffleQueryStage is materialized so it needs to be canceled.
assert(shuffleQueryStageExecs(0).shuffle.isMaterializationStarted(),
"Materialization should be started.")
// Second ShuffleQueryStage materialization is failed so
// it is excluded from the cancellation due to earlyFailedStage.
assert(shuffleQueryStageExecs(1).shuffle.isMaterializationStarted(),
"Materialization should be started but it is failed.")
// Last ShuffleQueryStage is not materialized yet so it does not require
// to be canceled and it is just skipped from the cancellation.
assert(!shuffleQueryStageExecs(2).shuffle.isMaterializationStarted(),
"Materialization should not be started.")
}
} finally {
spark.experimental.extraStrategies = Nil
}
}
withUserDefinedFunction("slow_udf" -> true) {
spark.udf.register("slow_udf", () => {
Thread.sleep(3000)
1
})

test("SPARK-47148: Check if BroadcastQueryStage materialization is started") {
def createJoinedDF(): DataFrame = {
spark.range(10).toDF("col1").createTempView("t1")
spark.range(5).coalesce(2).toDF("col2").createTempView("t2")
spark.range(15).toDF("col3").filter(Symbol("col3") >= 2).createTempView("t3")
sql("SELECT /*+ BROADCAST(t3) */ * FROM (SELECT /*+ BROADCAST(t2) */ * FROM t1 " +
"INNER JOIN t2 ON t1.col1 = t2.col2) t JOIN t3 ON t.col1 = t3.col3;")
}
withTempView("t1", "t2", "t3") {
try {
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val joinedDF = createJoinedDF()
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val joined = createJoinedDF()
joined.explain(true)

val error = intercept[SparkException] {
joinedDF.collect()
joined.collect()
}
assert(error.getMessage() contains "ProblematicCoalesce execution is failed")
assert(error.getMessage() contains "coalesce test error")

val adaptivePlan =
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
val adaptivePlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]

// All QueryStages should be based on BroadcastQueryStageExec
val broadcastQueryStageExecs = collect(adaptivePlan) {
case bqse: BroadcastQueryStageExec => bqse
}
assert(broadcastQueryStageExecs.length == 2, adaptivePlan)
broadcastQueryStageExecs.foreach { bqse =>
assert(bqse.name.contains("BroadcastQueryStageExec-"))
// Both BroadcastQueryStages are materialized at the beginning.
assert(bqse.broadcast.isMaterializationStarted(),
s"${bqse.name}' s materialization should be started.")
// All QueryStages should be based on ShuffleQueryStageExec
val shuffleQueryStageExecs = collect(adaptivePlan) {
case sqse: ShuffleQueryStageExec => sqse
}
assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " +
s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
// First ShuffleQueryStage is cancelled before shuffle job is submitted.
assert(shuffleQueryStageExecs(0).shuffle.futureAction.get.isEmpty)
// Second ShuffleQueryStage has submitted the shuffle job but it failed.
assert(shuffleQueryStageExecs(1).shuffle.futureAction.get.isDefined,
"Materialization should be started but it is failed.")
// Third ShuffleQueryStage is cancelled before shuffle job is submitted.
assert(shuffleQueryStageExecs(2).shuffle.futureAction.get.isEmpty)
}
} finally {
spark.experimental.extraStrategies = Nil
Expand Down Expand Up @@ -3057,8 +3021,11 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator {
private object TestProblematicCoalesceStrategy extends Strategy {
private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan)
extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new SparkException("ProblematicCoalesce execution is failed")
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { _ =>
throw new RuntimeException("coalesce test error")
}
}
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec =
copy(child = newChild)
Expand Down