diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 674c7d06080..00e5772a2f5 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -59,17 +59,17 @@ jobs:
- java: 8
spark: '3.3'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6'
- exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+ exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.1-binary'
- java: 8
spark: '3.3'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
- exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+ exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.2-binary'
- java: 8
spark: '3.3'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.4.0 -Dspark.archive.name=spark-3.4.0-bin-hadoop3.tgz -Pzookeeper-3.6'
- exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+ exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.4-binary'
exclude:
# SPARK-33772: Spark supports JDK 17 since 3.3.0
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
index 98c1cca0212..51d21f6844c 100644
--- a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
+++ b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
@@ -37,6 +37,14 @@
${project.version}
+
+ org.apache.kyuubi
+ kyuubi-download
+ ${project.version}
+ pom
+ test
+
+
org.apache.kyuubi
kyuubi-extension-spark-common_${scala.binary.version}
@@ -45,6 +53,14 @@
test
+
+ org.apache.kyuubi
+ kyuubi-util-scala_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
org.scala-lang
scala-library
@@ -130,6 +146,38 @@
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ regex-property
+
+ regex-property
+
+
+ spark.home
+ ${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name}
+ (.+)\.tgz
+ $1
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+
+
+
+ ${spark.home}
+ ${scala.binary.version}
+
+
+
org.apache.maven.plugins
maven-shade-plugin
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 5d346422848..792315d897a 100644
--- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
- extensions.injectQueryStagePrepRule(FinalStageResourceManager)
+ extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
}
}
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index dc573f83867..32fb9f5ce84 100644
--- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan}
@@ -185,7 +187,12 @@ case class FinalStageResourceManager(session: SparkSession)
numReduce: Int): Unit = {
val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]
- val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
+ val executorsToKill =
+ if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL)) {
+ executorAllocationClient.getExecutorIds()
+ } else {
+ findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
+ }
logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " +
s"[${executorsToKill.mkString(", ")}].")
if (executorsToKill.isEmpty) {
@@ -210,6 +217,14 @@ case class FinalStageResourceManager(session: SparkSession)
adjustTargetNumExecutors = true,
countFailures = false,
force = false)
+
+ FinalStageResourceManager.getAdjustedTargetExecutors(sc)
+ .filter(_ < targetExecutors).foreach { adjustedExecutors =>
+ val delta = targetExecutors - adjustedExecutors
+ logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " +
+ s"($targetExecutors). Requesting $delta additional executor(s).")
+ executorAllocationClient.requestExecutors(delta)
+ }
}
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
@@ -218,6 +233,31 @@ case class FinalStageResourceManager(session: SparkSession)
OptimizeShuffleWithLocalRead)
}
+object FinalStageResourceManager extends Logging {
+
+ private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] = {
+ sc.schedulerBackend match {
+ case schedulerBackend: CoarseGrainedSchedulerBackend =>
+ try {
+ val field = classOf[CoarseGrainedSchedulerBackend]
+ .getDeclaredField("requestedTotalExecutorsPerResourceProfile")
+ field.setAccessible(true)
+ schedulerBackend.synchronized {
+ val requestedTotalExecutorsPerResourceProfile =
+ field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]]
+ val defaultRp = sc.resourceProfileManager.defaultResourceProfile
+ requestedTotalExecutorsPerResourceProfile.get(defaultRp)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Failed to get requestedTotalExecutors of Default ResourceProfile", e)
+ None
+ }
+ case _ => None
+ }
+ }
+}
+
trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
@tailrec
final protected def findFinalRebalanceStage(plan: SparkPlan): Option[ShuffleQueryStageExec] = {
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
new file mode 100644
index 00000000000..4b9991ef6f2
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.tags.SparkLocalClusterTest
+
+@SparkLocalClusterTest
+class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {
+
+ override def sparkConf(): SparkConf = {
+ // It is difficult to run spark in local-cluster mode when spark.testing is set.
+ sys.props.remove("spark.testing")
+
+ super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.initialExecutors", "3")
+ .set("spark.dynamicAllocation.minExecutors", "1")
+ .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
+ .set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true")
+ .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key, "true")
+ }
+
+ test("[KYUUBI #5136][Bug] Final Stage hangs forever") {
+ // Prerequisite to reproduce the bug:
+ // 1. Dynamic allocation is enabled.
+ // 2. Dynamic allocation min executors is 1.
+ // 3. target executors < active executors.
+ // 4. No active executor is left after FinalStageResourceManager killed executors.
+ // This is possible because FinalStageResourceManager retained executors may already be
+ // requested to be killed but not died yet.
+ // 5. Final Stage required executors is 1.
+ withSQLConf(
+ (KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key, "true")) {
+ withTable("final_stage") {
+ eventually(timeout(Span(10, Minutes))) {
+ sql(
+ "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id")
+ }
+ assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get == 1)
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
index 947c03ea025..20db5d12f18 100644
--- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
+++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
@@ -55,6 +55,22 @@
provided
+
+ org.apache.kyuubi
+ kyuubi-download
+ ${project.version}
+ pom
+ test
+
+
+
+ org.apache.kyuubi
+ kyuubi-util-scala_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
org.apache.spark
spark-core_${scala.binary.version}
@@ -111,11 +127,49 @@
jakarta.xml.bind-api
test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ regex-property
+
+ regex-property
+
+
+ spark.home
+ ${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name}
+ (.+)\.tgz
+ $1
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+
+
+
+ ${spark.home}
+ ${scala.binary.version}
+
+
+
org.antlr
antlr4-maven-plugin
diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index fa118a3e28b..6f45dae126e 100644
--- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -210,6 +210,14 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
+ val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL =
+ buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll")
+ .doc("When true, eagerly kill all executors before running final write stage. " +
+ "Mainly for test.")
+ .version("1.8.0")
+ .booleanConf
+ .createWithDefault(false)
+
val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
.doc("When true, skip killing executors if the plan has table caches.")
diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 5d346422848..792315d897a 100644
--- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
- extensions.injectQueryStagePrepRule(FinalStageResourceManager)
+ extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
}
}
diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index 16002dfa0fa..81873476cc4 100644
--- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan}
@@ -188,7 +190,12 @@ case class FinalStageResourceManager(session: SparkSession)
numReduce: Int): Unit = {
val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]
- val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
+ val executorsToKill =
+ if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL)) {
+ executorAllocationClient.getExecutorIds()
+ } else {
+ findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
+ }
logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " +
s"[${executorsToKill.mkString(", ")}].")
if (executorsToKill.isEmpty) {
@@ -213,6 +220,14 @@ case class FinalStageResourceManager(session: SparkSession)
adjustTargetNumExecutors = true,
countFailures = false,
force = false)
+
+ FinalStageResourceManager.getAdjustedTargetExecutors(sc)
+ .filter(_ < targetExecutors).foreach { adjustedExecutors =>
+ val delta = targetExecutors - adjustedExecutors
+ logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " +
+ s"($targetExecutors). Requesting $delta additional executor(s).")
+ executorAllocationClient.requestExecutors(delta)
+ }
}
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
@@ -221,6 +236,31 @@ case class FinalStageResourceManager(session: SparkSession)
OptimizeShuffleWithLocalRead)
}
+object FinalStageResourceManager extends Logging {
+
+ private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] = {
+ sc.schedulerBackend match {
+ case schedulerBackend: CoarseGrainedSchedulerBackend =>
+ try {
+ val field = classOf[CoarseGrainedSchedulerBackend]
+ .getDeclaredField("requestedTotalExecutorsPerResourceProfile")
+ field.setAccessible(true)
+ schedulerBackend.synchronized {
+ val requestedTotalExecutorsPerResourceProfile =
+ field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]]
+ val defaultRp = sc.resourceProfileManager.defaultResourceProfile
+ requestedTotalExecutorsPerResourceProfile.get(defaultRp)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Failed to get requestedTotalExecutors of Default ResourceProfile", e)
+ None
+ }
+ case _ => None
+ }
+ }
+}
+
trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
@tailrec
final protected def findFinalRebalanceStage(plan: SparkPlan): Option[ShuffleQueryStageExec] = {
diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
new file mode 100644
index 00000000000..4b9991ef6f2
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.tags.SparkLocalClusterTest
+
+@SparkLocalClusterTest
+class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {
+
+ override def sparkConf(): SparkConf = {
+ // It is difficult to run spark in local-cluster mode when spark.testing is set.
+ sys.props.remove("spark.testing")
+
+ super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.initialExecutors", "3")
+ .set("spark.dynamicAllocation.minExecutors", "1")
+ .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
+ .set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true")
+ .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key, "true")
+ }
+
+ test("[KYUUBI #5136][Bug] Final Stage hangs forever") {
+ // Prerequisite to reproduce the bug:
+ // 1. Dynamic allocation is enabled.
+ // 2. Dynamic allocation min executors is 1.
+ // 3. target executors < active executors.
+ // 4. No active executor is left after FinalStageResourceManager killed executors.
+ // This is possible because FinalStageResourceManager retained executors may already be
+ // requested to be killed but not died yet.
+ // 5. Final Stage required executors is 1.
+ withSQLConf(
+ (KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key, "true")) {
+ withTable("final_stage") {
+ eventually(timeout(Span(10, Minutes))) {
+ sql(
+ "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id")
+ }
+ assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get == 1)
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index fa118a3e28b..6f45dae126e 100644
--- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -210,6 +210,14 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
+ val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL =
+ buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll")
+ .doc("When true, eagerly kill all executors before running final write stage. " +
+ "Mainly for test.")
+ .version("1.8.0")
+ .booleanConf
+ .createWithDefault(false)
+
val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
.doc("When true, skip killing executors if the plan has table caches.")
diff --git a/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java
new file mode 100644
index 00000000000..dd718f125c3
--- /dev/null
+++ b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.tags;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.scalatest.TagAnnotation;
+
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface SparkLocalClusterTest {}