diff --git a/docs/extensions/engines/spark/rules.md b/docs/extensions/engines/spark/rules.md index 5c8c0486920..96dcdb832fc 100644 --- a/docs/extensions/engines/spark/rules.md +++ b/docs/extensions/engines/spark/rules.md @@ -63,24 +63,31 @@ Now, you can enjoy the Kyuubi SQL Extension. Kyuubi provides some configs to make these feature easy to use. -| Name | Default Value | Description | Since | -|---------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------| -| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true | Add repartition node at the top of query plan. An approach of merging small files. | 1.2.0 | -| spark.sql.optimizer.insertRepartitionNum | none | The partition number if `spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. If AQE is disabled, the default value is `spark.sql.shuffle.partitions`. If AQE is enabled, the default value is none that means depend on AQE. | 1.2.0 | -| spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum | 100 | The partition number of each dynamic partition if `spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. We will repartition by dynamic partition columns to reduce the small file but that can cause data skew. This config is to extend the partition of dynamic partition column to avoid skew but may generate some small files. | 1.2.0 | -| spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle node exists before shuffled join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario join, multi table join). | 1.2.0 | -| spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the final stage support use different config with previous stage. The prefix of final stage config key should be `spark.sql.finalStage.`. For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0 | -| spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. | 1.4.0 | -| spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 | -| spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 | -| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 | -| spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0 | -| spark.sql.optimizer.rebalanceBeforeZorder.enabled | false | When true, we do a rebalance before zorder in case data skew. Note that, if the insertion is dynamic partition we will use the partition columns to rebalance. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | -| spark.sql.optimizer.rebalanceZorderColumns.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do rebalance before Z-Order. If it's dynamic partition insert, the rebalance expression will include both partition columns and Z-Order columns. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | -| spark.sql.optimizer.twoPhaseRebalanceBeforeZorder.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do two phase rebalance before Z-Order for the dynamic partition write. The first phase rebalance using dynamic partition column; The second phase rebalance using dynamic partition column Z-Order columns. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | -| spark.sql.optimizer.zorderUsingOriginalOrdering.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do sort by the original ordering i.e. lexicographical order. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | -| spark.sql.optimizer.inferRebalanceAndSortOrders.enabled | false | When ture, infer columns for rebalance and sort orders from original query, e.g. the join keys from join. It can avoid compression ratio regression. | 1.7.0 | -| spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns | 3 | The max columns of inferred columns. | 1.7.0 | -| spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 | -| spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true | When true, only enable final stage isolation for writing. | 1.7.0 | +| Name | Default Value | Description | Since | +|---------------------------------------------------------------------|----------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------| +| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true | Add repartition node at the top of query plan. An approach of merging small files. | 1.2.0 | +| spark.sql.optimizer.insertRepartitionNum | none | The partition number if `spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. If AQE is disabled, the default value is `spark.sql.shuffle.partitions`. If AQE is enabled, the default value is none that means depend on AQE. | 1.2.0 | +| spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum | 100 | The partition number of each dynamic partition if `spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. We will repartition by dynamic partition columns to reduce the small file but that can cause data skew. This config is to extend the partition of dynamic partition column to avoid skew but may generate some small files. | 1.2.0 | +| spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle node exists before shuffled join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario join, multi table join). | 1.2.0 | +| spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the final stage support use different config with previous stage. The prefix of final stage config key should be `spark.sql.finalStage.`. For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0 | +| spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. | 1.4.0 | +| spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 | +| spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 | +| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 | +| spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0 | +| spark.sql.optimizer.rebalanceBeforeZorder.enabled | false | When true, we do a rebalance before zorder in case data skew. Note that, if the insertion is dynamic partition we will use the partition columns to rebalance. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | +| spark.sql.optimizer.rebalanceZorderColumns.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do rebalance before Z-Order. If it's dynamic partition insert, the rebalance expression will include both partition columns and Z-Order columns. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | +| spark.sql.optimizer.twoPhaseRebalanceBeforeZorder.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do two phase rebalance before Z-Order for the dynamic partition write. The first phase rebalance using dynamic partition column; The second phase rebalance using dynamic partition column Z-Order columns. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | +| spark.sql.optimizer.zorderUsingOriginalOrdering.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do sort by the original ordering i.e. lexicographical order. Note that, this config only affects with Spark 3.3.x. | 1.6.0 | +| spark.sql.optimizer.inferRebalanceAndSortOrders.enabled | false | When ture, infer columns for rebalance and sort orders from original query, e.g. the join keys from join. It can avoid compression ratio regression. | 1.7.0 | +| spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns | 3 | The max columns of inferred columns. | 1.7.0 | +| spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 | +| spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true | When true, only enable final stage isolation for writing. | 1.7.0 | +| spark.sql.finalWriteStageEagerlyKillExecutors.enabled | false | When true, eagerly kill redundant executors before running final write stage. | 1.8.0 | +| spark.sql.finalWriteStageResourceIsolation.enabled | false | When true, make final write stage resource isolation using custom RDD resource profile. | 1.8.0 | +| spark.sql.finalWriteStageNumPartitionFactor | 1 | If the target executors * factor < active executors, and target executors > min executors, then inject kill executors or inject custom resource profile. | 1.8.0 | +| spark.sql.finalWriteStageExecutorCores | fallback spark.executor.cores | Specify the executor core request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | +| spark.sql.finalWriteStageExecutorMemory | fallback spark.executor.memory | Specify the executor on heap memory request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | +| spark.sql.finalWriteStageExecutorMemoryOverhead | fallback spark.executor.memoryOverhead | Specify the executor memory overhead request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | +| spark.sql.finalWriteStageExecutorOffHeapMemory | NONE | Specify the executor off heap memory request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala index ef9da41be13..0d034c26cf1 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.sql +import org.apache.spark.FinalStageResourceManager import org.apache.spark.sql.SparkSessionExtensions import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy} @@ -39,5 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) { // watchdog extension extensions.injectOptimizerRule(ForcedMaxOutputRowsRule) extensions.injectPlannerStrategy(MaxPartitionStrategy) + + extensions.injectQueryStagePrepRule(FinalStageResourceManager) } } diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/CustomResourceProfileExec.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/CustomResourceProfileExec.scala new file mode 100644 index 00000000000..38b57ed44b5 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/CustomResourceProfileExec.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.network.util.{ByteUnit, JavaUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfileBuilder} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.kyuubi.sql.KyuubiSQLConf._ + +/** + * This node wraps the final executed plan and inject custom resource profile to the RDD. + * It assumes that, the produced RDD would create the `ResultStage` in `DAGScheduler`, + * so it makes resource isolation between previous and final stage. + * + * Note that, Spark does not support config `minExecutors` for each resource profile. + * Which means, it would retain `minExecutors` for each resource profile. + * So, suggest set `spark.dynamicAllocation.minExecutors` to 0 if enable this feature. + */ +case class CustomResourceProfileExec(child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output + + override def supportsColumnar: Boolean = child.supportsColumnar + + override def supportsRowBased: Boolean = child.supportsRowBased + + private val executorCores = conf.getConf(FINAL_WRITE_STAGE_EXECUTOR_CORES).getOrElse( + sparkContext.getConf.getInt("spark.executor.cores", 1)) + private val executorMemory = conf.getConf(FINAL_WRITE_STAGE_EXECUTOR_MEMORY).getOrElse( + sparkContext.getConf.get("spark.executor.memory", "2G")) + private val executorMemoryOverhead = + conf.getConf(FINAL_WRITE_STAGE_EXECUTOR_MEMORY_OVERHEAD) + .getOrElse(sparkContext.getConf.get("spark.executor.memoryOverhead", "1G")) + private val executorOffHeapMemory = conf.getConf(FINAL_WRITE_STAGE_EXECUTOR_OFF_HEAP_MEMORY) + + override lazy val metrics: Map[String, SQLMetric] = { + val base = Map( + "executorCores" -> SQLMetrics.createMetric(sparkContext, "executor cores"), + "executorMemory" -> SQLMetrics.createMetric(sparkContext, "executor memory (MiB)"), + "executorMemoryOverhead" -> SQLMetrics.createMetric( + sparkContext, + "executor memory overhead (MiB)")) + val addition = executorOffHeapMemory.map(_ => + "executorOffHeapMemory" -> + SQLMetrics.createMetric(sparkContext, "executor off heap memory (MiB)")).toMap + base ++ addition + } + + private def wrapResourceProfile[T](rdd: RDD[T]): RDD[T] = { + metrics("executorCores") += executorCores + metrics("executorMemory") += JavaUtils.byteStringAs(executorMemory, ByteUnit.MiB) + metrics("executorMemoryOverhead") += JavaUtils.byteStringAs( + executorMemoryOverhead, + ByteUnit.MiB) + executorOffHeapMemory.foreach(m => + metrics("executorOffHeapMemory") += JavaUtils.byteStringAs(m, ByteUnit.MiB)) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) + + val resourceProfileBuilder = new ResourceProfileBuilder() + val executorResourceRequests = new ExecutorResourceRequests() + executorResourceRequests.cores(executorCores) + executorResourceRequests.memory(executorMemory) + executorResourceRequests.memoryOverhead(executorMemoryOverhead) + executorOffHeapMemory.foreach(executorResourceRequests.offHeapMemory) + resourceProfileBuilder.require(executorResourceRequests) + rdd.withResources(resourceProfileBuilder.build()) + rdd + } + + override protected def doExecute(): RDD[InternalRow] = { + val rdd = child.execute() + wrapResourceProfile(rdd) + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val rdd = child.executeColumnar() + wrapResourceProfile(rdd) + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = { + this.copy(child = newChild) + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala new file mode 100644 index 00000000000..6838abf798e --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/FinalStageResourceManager.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan} +import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec} + +import org.apache.kyuubi.sql.{KyuubiSQLConf, MarkNumOutputColumnsRule} + +/** + * This rule assumes the final write stage has less cores requirement than previous, otherwise + * this rule would take no effect. + * + * It provide two features: + * 1. Kill redundant executors before running final write stage + * 2. Inject custom resource profile for final write stage, so we can specify custom + * executor resource config + */ +case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + if (!conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_RESOURCE_ISOLATION_ENABLED) && + !conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED)) { + return plan + } + + if (!MarkNumOutputColumnsRule.isWrite(session, plan)) { + return plan + } + + val sc = session.sparkContext + val dra = sc.getConf.getBoolean("spark.dynamicAllocation.enabled", false) + val executorCores = sc.getConf.getInt("spark.executor.cores", 1) + val minExecutors = sc.getConf.getInt("spark.dynamicAllocation.minExecutors", 0) + val maxExecutors = sc.getConf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue) + val hasImprovementRoom = maxExecutors - minExecutors > 1 + // Fast fail if: + // 1. resource profile is only supported when dra is enabled + // 2. DRA only work with yarn and k8s + // 3. logically, dra should kill a lot of executors otherwise it has no benefits. + // 32 is a value to make sure we have room for improvement. + if (!dra || !sc.schedulerBackend.isInstanceOf[CoarseGrainedSchedulerBackend] || + hasImprovementRoom) { + return plan + } + + val stage = findFinalRebalanceStage(plan) + if (stage.isEmpty) { + return plan + } + + // Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied. + // So we need to apply it by self. + val shuffleRead = queryStageOptimizerRules.foldLeft(stage.get.asInstanceOf[SparkPlan]) { + case (latest, rule) => rule.apply(latest) + } + shuffleRead match { + case AQEShuffleReadExec(stage: ShuffleQueryStageExec, partitionSpecs) => + val factor = conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_PARTITION_FACTOR) + // The condition whether inject custom resource profile: + // - target executors * factor < active executors + // - target executors > min executors + val numActiveExecutors = sc.getExecutorIds().length + val expectedCores = partitionSpecs.length + val targetExecutors = (expectedCores / executorCores) + 1 + val hasBenefits = targetExecutors * factor < numActiveExecutors && + targetExecutors > minExecutors + if (hasBenefits) { + val shuffleId = stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleDependency.shuffleId + val numReduce = stage.plan.asInstanceOf[ShuffleExchangeExec].numPartitions + // Now, there is only a final stage waiting to execute and all tasks of previous stage + // are finished. Here, we kill redundant existed executors eagerly so the tasks of final + // stage can be centralized scheduled. + if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED)) { + killExecutors(sc, targetExecutors, shuffleId, numReduce) + } + if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_RESOURCE_ISOLATION_ENABLED)) { + // TODO: Logically, We can call `backend.requestTotalExecutors` eagerly + // to reduce the task submit pending time, but it may lose task locality + injectCustomResourceProfile(plan, stage.id) + } else { + plan + } + } else { + logInfo(s"Has no benefits to kill executors or inject custom resource profile, " + + s"active executors: $numActiveExecutors, min executor: $minExecutors, " + + s"target executors: $targetExecutors.") + plan + } + + case _ => + plan + } + } + + /** + * The priority of kill executors follow: + * 1. kill executor who is younger than other (The older the JIT works better) + * 2. kill executor who produces less shuffle data first + */ + private def findExecutorToKill( + sc: SparkContext, + targetExecutors: Int, + shuffleId: Int, + numReduce: Int): Seq[String] = { + val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + val shuffleStatus = tracker.shuffleStatuses(shuffleId) + val executorToBlockSize = new mutable.HashMap[String, Long] + shuffleStatus.withMapStatuses { mapStatus => + mapStatus.foreach { status => + var i = 0 + var sum = 0L + while (i < numReduce) { + sum += status.getSizeForBlock(i) + i += 1 + } + executorToBlockSize.getOrElseUpdate(status.location.executorId, sum) + } + } + + val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] + val executorsWithRegistrationTs = backend.getExecutorsWithRegistrationTs() + val existedExecutors = executorsWithRegistrationTs.keys.toSet + val expectedNumExecutorToKill = existedExecutors.size - targetExecutors + if (expectedNumExecutorToKill < 1) { + return Seq.empty + } + + val executorIdsToKill = new ArrayBuffer[String]() + if (executorToBlockSize.size < expectedNumExecutorToKill) { + // The last stage is running fast and finished in a short time. The existed executors are + // from previous stages that have not been killed by DRA, so we can not find it by tracking + // shuffle status. + // We should evict executors by their alive time first and retain all of executors which + // have better locality for shuffle block. + val numExecutorToKill = expectedNumExecutorToKill - executorToBlockSize.size + executorsWithRegistrationTs.toSeq.sortBy(_._2).foreach { case (id, _) => + if (executorIdsToKill.length < numExecutorToKill && !executorToBlockSize.contains(id)) { + executorIdsToKill.append(id) + } + } + } + + // Evict the rest executors according to the shuffle block size + executorToBlockSize.toSeq.sortBy(_._2).foreach { case (id, _) => + if (executorIdsToKill.length < expectedNumExecutorToKill) { + executorIdsToKill.append(id) + } + } + + executorIdsToKill.toSeq + } + + private def killExecutors( + sc: SparkContext, + targetExecutors: Int, + shuffleId: Int, + numReduce: Int): Unit = { + val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient] + + val executorsToKill = + if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_RESOURCE_ISOLATION_ENABLED)) { + // If we decide to use custom resource profile, the existed executors have no meaning + // any more. So kill all of them. + executorAllocationClient.getExecutorIds() + } else { + findExecutorToKill(sc, targetExecutors, shuffleId, numReduce) + } + + logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " + + s"[${executorsToKill.mkString(", ")}].") + + // It is a little hack to kill executors with DRA enabled. + // It may cause the status in `ExecutorAllocationManager` inconsistent with + // `CoarseGrainedSchedulerBackend` for a while. But it should be sync finally. + executorAllocationClient.killExecutors( + executorIds = executorsToKill, + adjustTargetNumExecutors = false, + countFailures = false, + force = false) + } + + private def injectCustomResourceProfile(plan: SparkPlan, id: Int): SparkPlan = { + plan match { + case stage: ShuffleQueryStageExec if stage.id == id => + CustomResourceProfileExec(stage) + case _ => plan.mapChildren(child => injectCustomResourceProfile(child, id)) + } + } + + @tailrec + private def findFinalRebalanceStage(plan: SparkPlan): Option[ShuffleQueryStageExec] = { + plan match { + case p: ProjectExec => findFinalRebalanceStage(p.child) + case f: FilterExec => findFinalRebalanceStage(f.child) + case s: SortExec if !s.global => findFinalRebalanceStage(s.child) + case stage: ShuffleQueryStageExec + if stage.isMaterialized && + stage.plan.isInstanceOf[ShuffleExchangeExec] && + stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleOrigin != ENSURE_REQUIREMENTS => + Some(stage) + case _ => None + } + } + + @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( + OptimizeSkewInRebalancePartitions, + CoalesceShufflePartitions(session), + OptimizeShuffleWithLocalRead) +} diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/InjectResourceProfileSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/InjectResourceProfileSuite.scala new file mode 100644 index 00000000000..5db5e7e6cf5 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/InjectResourceProfileSuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate + +import org.apache.kyuubi.sql.KyuubiSQLConf + +class InjectResourceProfileSuite extends KyuubiSparkSQLExtensionTest { + private def checkCustomResourceProfile(sqlString: String, exists: Boolean): Unit = { + @volatile var lastEvent: SparkListenerSQLAdaptiveExecutionUpdate = null + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: SparkListenerSQLAdaptiveExecutionUpdate => lastEvent = e + case _ => + } + } + } + + spark.sparkContext.addSparkListener(listener) + try { + sql(sqlString).collect() + spark.sparkContext.listenerBus.waitUntilEmpty() + + assert(lastEvent != null) + var current = lastEvent.sparkPlanInfo + var shouldStop = false + while (!shouldStop) { + if (current.nodeName != "CustomResourceProfile") { + if (current.children.isEmpty) { + assert(!exists) + shouldStop = true + } else { + current = current.children.head + } + } else { + assert(exists) + shouldStop = true + } + } + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } + + test("Inject resource profile") { + withTable("t") { + withSQLConf( + "spark.sql.adaptive.forceApply" -> "true", + KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true", + KyuubiSQLConf.FINAL_WRITE_STAGE_RESOURCE_ISOLATION_ENABLED.key -> "true") { + + sql("CREATE TABLE t (c1 int, c2 string) USING PARQUET") + + checkCustomResourceProfile("INSERT INTO TABLE t VALUES(1, 'a')", false) + checkCustomResourceProfile("SELECT 1", false) + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index 0fe9f649eaa..c993d554f4f 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -190,4 +190,61 @@ object KyuubiSQLConf { .version("1.7.0") .booleanConf .createWithDefault(true) + + val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED = + buildConf("spark.sql.finalWriteStageEagerlyKillExecutors.enabled") + .doc("When true, eagerly kill redundant executors before running final write stage.") + .version("1.8.0") + .booleanConf + .createWithDefault(false) + + val FINAL_WRITE_STAGE_RESOURCE_ISOLATION_ENABLED = + buildConf("spark.sql.finalWriteStageResourceIsolation.enabled") + .doc( + "When true, make final write stage resource isolation using custom RDD resource profile.") + .version("1.8.0") + .booleanConf + .createWithDefault(false) + + val FINAL_WRITE_STAGE_PARTITION_FACTOR = + buildConf("spark.sql.finalWriteStageNumPartitionFactor") + .doc("If the target executors * factor < active executors, and " + + "target executors > min executors, then inject kill executors or inject" + + "custom resource profile.") + .version("1.8.0") + .doubleConf + .checkValue(_ >= 1, "must be bigger than or equal to 1") + .createWithDefault(1) + + val FINAL_WRITE_STAGE_EXECUTOR_CORES = + buildConf("spark.sql.finalWriteStageExecutorCores") + .doc("Specify the executor core request for final write stage. " + + "It would be passed to the RDD resource profile.") + .version("1.8.0") + .intConf + .createOptional + + val FINAL_WRITE_STAGE_EXECUTOR_MEMORY = + buildConf("spark.sql.finalWriteStageExecutorMemory") + .doc("Specify the executor on heap memory request for final write stage. " + + "It would be passed to the RDD resource profile.") + .version("1.8.0") + .stringConf + .createOptional + + val FINAL_WRITE_STAGE_EXECUTOR_MEMORY_OVERHEAD = + buildConf("spark.sql.finalWriteStageExecutorMemoryOverhead") + .doc("Specify the executor memory overhead request for final write stage. " + + "It would be passed to the RDD resource profile.") + .version("1.8.0") + .stringConf + .createOptional + + val FINAL_WRITE_STAGE_EXECUTOR_OFF_HEAP_MEMORY = + buildConf("spark.sql.finalWriteStageExecutorOffHeapMemory") + .doc("Specify the executor off heap memory request for final write stage. " + + "It would be passed to the RDD resource profile.") + .version("1.8.0") + .stringConf + .createOptional } diff --git a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala index fd81948c61a..e58ac726c13 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala @@ -29,6 +29,8 @@ import org.apache.kyuubi.sql.KyuubiSQLConf trait KyuubiSparkSQLExtensionTest extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper { + sys.props.put("spark.testing", "1") + private var _spark: Option[SparkSession] = None protected def spark: SparkSession = _spark.getOrElse { throw new RuntimeException("test spark session don't initial before using it.")