Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename
  • Loading branch information
carsonwang committed Mar 15, 2019
commit 028b0ac8fb31b0f12f2b65a548b4fd623b4088c9
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[execution] object SparkPlanInfo {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.finalPlan :: Nil
case fragment: QueryStageExec => fragment.plan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class QueryStageManager(
initialPlan: SparkPlan,
session: SparkSession,
callback: QueryStageManagerCallback)
extends EventLoop[QueryStageManagerEvent]("QueryFragmentCreator") {
extends EventLoop[QueryStageManagerEvent]("QueryStageCreator") {

private def conf = session.sessionState.conf

Expand Down Expand Up @@ -201,7 +201,7 @@ case class CreateStageResult(newPlan: SparkPlan, allChildStagesReady: Boolean)

object QueryStageManager {
private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("QueryFragmentCreator", 16))
ThreadUtils.newDaemonCachedThreadPool("QueryStageCreator", 16))
}

trait QueryStageManagerCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {
val shuffleMetrics: Seq[MapOutputStatistics] = plan.collect {
case fragment: ShuffleQueryStageExec =>
val metricsFuture = fragment.mapOutputStatisticsFuture
assert(metricsFuture.isCompleted, "ShuffleQueryFragment should already be ready")
case stage: ShuffleQueryStageExec =>
val metricsFuture = stage.mapOutputStatisticsFuture
assert(metricsFuture.isCompleted, "ShuffleQueryStageExec should already be ready")
ThreadUtils.awaitResult(metricsFuture, Duration.Zero)
}

Expand Down Expand Up @@ -88,7 +88,7 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {

/**
* Estimates partition start indices for post-shuffle partitions based on
* mapOutputStatistics provided by all pre-shuffle fragments.
* mapOutputStatistics provided by all pre-shuffle stages.
*/
// visible for testing.
private[sql] def estimatePartitionStartIndices(
Expand All @@ -112,7 +112,7 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
s"targetPostShuffleInputSize $targetPostShuffleInputSize.")

// Make sure we do get the same number of pre-shuffle partitions for those fragments.
// Make sure we do get the same number of pre-shuffle partitions for those stages.
val distinctNumPreShufflePartitions =
mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
// The reason that we are expecting a single value of the number of pre-shuffle partitions
Expand All @@ -135,7 +135,7 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {

var i = 0
while (i < numPreShufflePartitions) {
// We calculate the total size of ith pre-shuffle partitions from all pre-shuffle fragments.
// We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
// Then, we add the total size to postShuffleInputSize.
var nextShuffleInputSize = 0L
var j = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object SparkPlanGraph {
case "InputAdapter" =>
buildSparkPlanGraphNode(
planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges)
case "BroadcastQueryFragment" | "ResultQueryFragment" | "ShuffleQueryFragment" =>
case "BroadcastQueryStage" | "ShuffleQueryStage" =>
if (exchanges.contains(planInfo.children.head)) {
// Point to the re-used exchange
val node = exchanges(planInfo.children.head)
Expand Down