Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@ jobs:
- java: 8
spark: '3.3'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task serialized by Spark 3.3 Driver can not be deserialized by Spark 3.1.3 Executor

comment: 'verify-on-spark-3.1-binary'
- java: 8
spark: '3.3'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.2-binary'
- java: 8
spark: '3.3'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.4.0 -Dspark.archive.name=spark-3.4.0-bin-hadoop3.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.4-binary'
exclude:
# SPARK-33772: Spark supports JDK 17 since 3.3.0
Expand Down
48 changes: 48 additions & 0 deletions extensions/spark/kyuubi-extension-spark-3-3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-download</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-extension-spark-common_${scala.binary.version}</artifactId>
Expand All @@ -45,6 +53,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down Expand Up @@ -130,6 +146,38 @@
<build>

<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>regex-property</id>
<goals>
<goal>regex-property</goal>
</goals>
<configuration>
<name>spark.home</name>
<value>${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name}</value>
<regex>(.+)\.tgz</regex>
<replacement>$1</replacement>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<environmentVariables>
<!--
Some tests run Spark in local-cluster mode.
This mode uses SPARK_HOME and SPARK_SCALA_VERSION to build command to launch a Spark Standalone Cluster.
-->
<SPARK_HOME>${spark.home}</SPARK_HOME>
<SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
</environmentVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan}
Expand Down Expand Up @@ -185,7 +186,12 @@ case class FinalStageResourceManager(session: SparkSession)
numReduce: Int): Unit = {
val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]

val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
val executorsToKill =
if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL)) {
executorAllocationClient.getExecutorIds()
} else {
findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
}
logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " +
s"[${executorsToKill.mkString(", ")}].")
if (executorsToKill.isEmpty) {
Expand All @@ -210,6 +216,38 @@ case class FinalStageResourceManager(session: SparkSession)
adjustTargetNumExecutors = true,
countFailures = false,
force = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we call client.requestTotalExecutors to adjust the target executor if targetExecutors < draTargetExecutors after kill executors ?

We might face a issue similar with apache/spark#19048

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we just need to tune adjustTargetNumExecutors from true to false and call client.requestTotalExecutors to ensure the final target executor is expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. But build parameters of client.requestTotalExecutors involves many details.
So I prefer to wait ExecutorAllocationManager to call client.requestTotalExecutors.

As long as we did not call killExecutors with adjustTargetNumExecutors = true, ExecutorAllocationManager should be able to manage target executor num correctly.


getAdjustedTargetExecutors(sc, executorAllocationClient)
.filter(_ < targetExecutors).foreach { adjustedExecutors =>
val delta = targetExecutors - adjustedExecutors
logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " +
s"($targetExecutors). Requesting $delta additional executor(s).")
executorAllocationClient.requestExecutors(delta)
}
}

private def getAdjustedTargetExecutors(
sc: SparkContext,
executorAllocationClient: ExecutorAllocationClient): Option[Int] = {
executorAllocationClient match {
case schedulerBackend: CoarseGrainedSchedulerBackend =>
try {
val field = classOf[CoarseGrainedSchedulerBackend]
.getDeclaredField("requestedTotalExecutorsPerResourceProfile")
field.setAccessible(true)
schedulerBackend.synchronized {
val requestedTotalExecutorsPerResourceProfile =
field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]]
val defaultRp = sc.resourceProfileManager.defaultResourceProfile
requestedTotalExecutorsPerResourceProfile.get(defaultRp)
}
} catch {
case e: Exception =>
logWarning("Failed to get requestedTotalExecutors of Default ResourceProfile", e)
None
}
case _ => None
}
}

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.SparkConf
import org.scalatest.time.{Minutes, Span}

import org.apache.kyuubi.sql.KyuubiSQLConf
import org.apache.kyuubi.tags.SparkLocalClusterTest

@SparkLocalClusterTest
class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {

override def sparkConf(): SparkConf = {
// It is difficult to run spark in local-cluster mode when spark.testing is set.
sys.props.remove("spark.testing")

super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.initialExecutors", "3")
.set("spark.dynamicAllocation.minExecutors", "1")
.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
.set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true")
.set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key, "true")
}

test("[KYUUBI #5136][Bug] Final Stage hangs forever") {
// Prerequisite to reproduce the bug:
// 1. Dynamic allocation is enabled.
// 2. Dynamic allocation min executors is 1.
// 3. target executors < active executors.
// 4. No active executor is left after FinalStageResourceManager killed executors.
// This is possible because FinalStageResourceManager retained executors may already be
// requested to be killed but not died yet.
// 5. Final Stage required executors is 1.
withSQLConf(
(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key, "true")) {
withTable("final_stage") {
eventually(timeout(Span(10, Minutes))) {
sql(
"CREATE TABLE final_stage AS SELECT id, count(*) as num FROM (SELECT 0 id) GROUP BY id")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we getAdjustedTargetExecutors at the end to make sure the target executor number is 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
}
}
54 changes: 54 additions & 0 deletions extensions/spark/kyuubi-extension-spark-3-4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-download</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -111,11 +127,49 @@
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>

<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>regex-property</id>
<goals>
<goal>regex-property</goal>
</goals>
<configuration>
<name>spark.home</name>
<value>${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name}</value>
<regex>(.+)\.tgz</regex>
<replacement>$1</replacement>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<environmentVariables>
<!--
Some tests run Spark in local-cluster mode.
This mode uses SPARK_HOME and SPARK_SCALA_VERSION to build command to launch a Spark Standalone Cluster.
-->
<SPARK_HOME>${spark.home}</SPARK_HOME>
<SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
</environmentVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)

val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL =
buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll")
.doc("When true, eagerly kill all executors before running final write stage. " +
"Mainly for test.")
.version("1.8.0")
.booleanConf
.createWithDefault(false)

val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
.doc("When true, skip killing executors if the plan has table caches.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, SparkContext, SparkEnv}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan}
Expand Down Expand Up @@ -188,7 +189,12 @@ case class FinalStageResourceManager(session: SparkSession)
numReduce: Int): Unit = {
val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]

val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
val executorsToKill =
if (conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL)) {
executorAllocationClient.getExecutorIds()
} else {
findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
}
logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " +
s"[${executorsToKill.mkString(", ")}].")
if (executorsToKill.isEmpty) {
Expand All @@ -213,6 +219,38 @@ case class FinalStageResourceManager(session: SparkSession)
adjustTargetNumExecutors = true,
countFailures = false,
force = false)

getAdjustedTargetExecutors(sc, executorAllocationClient)
.filter(_ < targetExecutors).foreach { adjustedExecutors =>
val delta = targetExecutors - adjustedExecutors
logInfo(s"Target executors after kill ($adjustedExecutors) is lower than required " +
s"($targetExecutors). Requesting $delta additional executor(s).")
executorAllocationClient.requestExecutors(delta)
}
}

private def getAdjustedTargetExecutors(
sc: SparkContext,
executorAllocationClient: ExecutorAllocationClient): Option[Int] = {
executorAllocationClient match {
case schedulerBackend: CoarseGrainedSchedulerBackend =>
try {
val field = classOf[CoarseGrainedSchedulerBackend]
.getDeclaredField("requestedTotalExecutorsPerResourceProfile")
field.setAccessible(true)
schedulerBackend.synchronized {
val requestedTotalExecutorsPerResourceProfile =
field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]]
val defaultRp = sc.resourceProfileManager.defaultResourceProfile
requestedTotalExecutorsPerResourceProfile.get(defaultRp)
}
} catch {
case e: Exception =>
logWarning("Failed to get requestedTotalExecutors of Default ResourceProfile", e)
None
}
case _ => None
}
}

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
Expand Down
Loading