Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
assert adjustedTargetExecutors == 1
  • Loading branch information
zhouyifan279 committed Aug 16, 2023
commit c4403eefa6c80bab4a160b747a756329b1193307
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)

extensions.injectQueryStagePrepRule(FinalStageResourceManager)
extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -217,7 +218,7 @@ case class FinalStageResourceManager(session: SparkSession)
countFailures = false,
force = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we call client.requestTotalExecutors to adjust the target executor if targetExecutors < draTargetExecutors after kill executors ?

We might face a issue similar with apache/spark#19048

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we just need to tune adjustTargetNumExecutors from true to false and call client.requestTotalExecutors to ensure the final target executor is expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. But build parameters of client.requestTotalExecutors involves many details.
So I prefer to wait ExecutorAllocationManager to call client.requestTotalExecutors.

As long as we did not call killExecutors with adjustTargetNumExecutors = true, ExecutorAllocationManager should be able to manage target executor num correctly.


getAdjustedTargetExecutors(sc, executorAllocationClient)
FinalStageResourceManager.getAdjustedTargetExecutors(sc)
.filter(_ < targetExecutors).foreach { adjustedExecutors =>
val delta = targetExecutors - adjustedExecutors
logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " +
Expand All @@ -226,10 +227,16 @@ case class FinalStageResourceManager(session: SparkSession)
}
}

private def getAdjustedTargetExecutors(
sc: SparkContext,
executorAllocationClient: ExecutorAllocationClient): Option[Int] = {
executorAllocationClient match {
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
OptimizeSkewInRebalancePartitions,
CoalesceShufflePartitions(session),
OptimizeShuffleWithLocalRead)
}

object FinalStageResourceManager extends Logging {

private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] = {
sc.schedulerBackend match {
case schedulerBackend: CoarseGrainedSchedulerBackend =>
try {
val field = classOf[CoarseGrainedSchedulerBackend]
Expand All @@ -249,11 +256,6 @@ case class FinalStageResourceManager(session: SparkSession)
case _ => None
}
}

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
OptimizeSkewInRebalancePartitions,
CoalesceShufflePartitions(session),
OptimizeShuffleWithLocalRead)
}

trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {
sql(
"CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we getAdjustedTargetExecutors at the end to make sure the target executor number is 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get == 1)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)

extensions.injectQueryStagePrepRule(FinalStageResourceManager)
extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -220,7 +221,7 @@ case class FinalStageResourceManager(session: SparkSession)
countFailures = false,
force = false)

getAdjustedTargetExecutors(sc, executorAllocationClient)
FinalStageResourceManager.getAdjustedTargetExecutors(sc)
.filter(_ < targetExecutors).foreach { adjustedExecutors =>
val delta = targetExecutors - adjustedExecutors
logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " +
Expand All @@ -229,10 +230,16 @@ case class FinalStageResourceManager(session: SparkSession)
}
}

private def getAdjustedTargetExecutors(
sc: SparkContext,
executorAllocationClient: ExecutorAllocationClient): Option[Int] = {
executorAllocationClient match {
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
OptimizeSkewInRebalancePartitions,
CoalesceShufflePartitions(session),
OptimizeShuffleWithLocalRead)
}

object FinalStageResourceManager extends Logging {

private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] = {
sc.schedulerBackend match {
case schedulerBackend: CoarseGrainedSchedulerBackend =>
try {
val field = classOf[CoarseGrainedSchedulerBackend]
Expand All @@ -252,11 +259,6 @@ case class FinalStageResourceManager(session: SparkSession)
case _ => None
}
}

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
OptimizeSkewInRebalancePartitions,
CoalesceShufflePartitions(session),
OptimizeShuffleWithLocalRead)
}

trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {
sql(
"CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id")
}
assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get == 1)
}
}
}
Expand Down